• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19732

12 Mar 2025 05:46AM UTC coverage: 88.509% (-0.02%) from 88.527%
#19732

Pull #11955

github

web-flow
Merge 9c8c8b36c into 2f52a0036
Pull Request #11955: Stops exception allocation on channel shutdown

34570 of 39058 relevant lines covered (88.51%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.68
/../netty/src/main/java/io/grpc/netty/NettyClientHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
20
import static io.netty.util.CharsetUtil.UTF_8;
21
import static io.netty.util.internal.ObjectUtil.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Stopwatch;
26
import com.google.common.base.Supplier;
27
import com.google.common.base.Ticker;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.InternalChannelz;
31
import io.grpc.InternalStatus;
32
import io.grpc.Metadata;
33
import io.grpc.Status;
34
import io.grpc.StatusException;
35
import io.grpc.internal.ClientStreamListener.RpcProgress;
36
import io.grpc.internal.ClientTransport.PingCallback;
37
import io.grpc.internal.GrpcAttributes;
38
import io.grpc.internal.GrpcUtil;
39
import io.grpc.internal.Http2Ping;
40
import io.grpc.internal.InUseStateAggregator;
41
import io.grpc.internal.KeepAliveManager;
42
import io.grpc.internal.TransportTracer;
43
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
44
import io.netty.buffer.ByteBuf;
45
import io.netty.buffer.ByteBufUtil;
46
import io.netty.buffer.Unpooled;
47
import io.netty.channel.Channel;
48
import io.netty.channel.ChannelFuture;
49
import io.netty.channel.ChannelFutureListener;
50
import io.netty.channel.ChannelHandlerContext;
51
import io.netty.channel.ChannelPromise;
52
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
53
import io.netty.handler.codec.http2.DefaultHttp2Connection;
54
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
55
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
56
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
57
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
58
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
59
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
60
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
61
import io.netty.handler.codec.http2.Http2CodecUtil;
62
import io.netty.handler.codec.http2.Http2Connection;
63
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
64
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
65
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
66
import io.netty.handler.codec.http2.Http2Error;
67
import io.netty.handler.codec.http2.Http2Exception;
68
import io.netty.handler.codec.http2.Http2FrameAdapter;
69
import io.netty.handler.codec.http2.Http2FrameLogger;
70
import io.netty.handler.codec.http2.Http2FrameReader;
71
import io.netty.handler.codec.http2.Http2FrameWriter;
72
import io.netty.handler.codec.http2.Http2Headers;
73
import io.netty.handler.codec.http2.Http2HeadersDecoder;
74
import io.netty.handler.codec.http2.Http2HeadersEncoder;
75
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
76
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
77
import io.netty.handler.codec.http2.Http2Settings;
78
import io.netty.handler.codec.http2.Http2Stream;
79
import io.netty.handler.codec.http2.Http2StreamVisitor;
80
import io.netty.handler.codec.http2.StreamBufferingEncoder;
81
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
82
import io.netty.handler.logging.LogLevel;
83
import io.perfmark.PerfMark;
84
import io.perfmark.Tag;
85
import io.perfmark.TaskCloseable;
86
import java.nio.channels.ClosedChannelException;
87
import java.util.LinkedHashMap;
88
import java.util.Map;
89
import java.util.concurrent.Executor;
90
import java.util.logging.Level;
91
import java.util.logging.Logger;
92
import javax.annotation.Nullable;
93

94
/**
95
 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
96
 * the context of the Netty Channel thread.
97
 */
98
class NettyClientHandler extends AbstractNettyHandler {
99
  private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
1✔
100
  static boolean enablePerRpcAuthorityCheck =
1✔
101
      GrpcUtil.getFlag("GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK", false);
1✔
102

103
  /**
104
   * A message that simply passes through the channel without any real processing. It is useful to
105
   * check if buffers have been drained and test the health of the channel in a single operation.
106
   */
107
  static final Object NOOP_MESSAGE = new Object();
1✔
108

109
  /**
110
   * Status used when the transport has exhausted the number of streams.
111
   */
112
  private static final Status EXHAUSTED_STREAMS_STATUS =
1✔
113
          Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");
1✔
114
  private static final long USER_PING_PAYLOAD = 1111;
115

116
  private final Http2Connection.PropertyKey streamKey;
117
  private final ClientTransportLifecycleManager lifecycleManager;
118
  private final KeepAliveManager keepAliveManager;
119
  // Returns new unstarted stopwatches
120
  private final Supplier<Stopwatch> stopwatchFactory;
121
  private final TransportTracer transportTracer;
122
  private final Attributes eagAttributes;
123
  private final String authority;
124
  private final InUseStateAggregator<Http2Stream> inUseState =
1✔
125
      new InUseStateAggregator<Http2Stream>() {
1✔
126
        @Override
127
        protected void handleInUse() {
128
          lifecycleManager.notifyInUse(true);
1✔
129
        }
1✔
130

131
        @Override
132
        protected void handleNotInUse() {
133
          lifecycleManager.notifyInUse(false);
1✔
134
        }
1✔
135
      };
136
  private final Map<String, Status> peerVerificationResults =
1✔
137
      new LinkedHashMap<String, Status>() {
1✔
138
        @Override
139
        protected boolean removeEldestEntry(Map.Entry<String, Status> eldest) {
140
          return size() > 100;
1✔
141
        }
142
      };
143

144
  private WriteQueue clientWriteQueue;
145
  private Http2Ping ping;
146
  private Attributes attributes;
147
  private InternalChannelz.Security securityInfo;
148
  private Status abruptGoAwayStatus;
149
  private Status channelInactiveReason;
150

151
  static NettyClientHandler newHandler(
152
      ClientTransportLifecycleManager lifecycleManager,
153
      @Nullable KeepAliveManager keepAliveManager,
154
      boolean autoFlowControl,
155
      int flowControlWindow,
156
      int maxHeaderListSize,
157
      int softLimitHeaderListSize,
158
      Supplier<Stopwatch> stopwatchFactory,
159
      Runnable tooManyPingsRunnable,
160
      TransportTracer transportTracer,
161
      Attributes eagAttributes,
162
      String authority,
163
      ChannelLogger negotiationLogger,
164
      Ticker ticker) {
165
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
166
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
1✔
167
    Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
1✔
168
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
169
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
170
    Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(encoder);
1✔
171
    Http2Connection connection = new DefaultHttp2Connection(false);
1✔
172
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
173
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
174
    DefaultHttp2RemoteFlowController controller =
1✔
175
        new DefaultHttp2RemoteFlowController(connection, dist);
176
    connection.remote().flowController(controller);
1✔
177

178
    return newHandler(
1✔
179
        connection,
180
        frameReader,
181
        frameWriter,
182
        lifecycleManager,
183
        keepAliveManager,
184
        autoFlowControl,
185
        flowControlWindow,
186
        maxHeaderListSize,
187
        softLimitHeaderListSize,
188
        stopwatchFactory,
189
        tooManyPingsRunnable,
190
        transportTracer,
191
        eagAttributes,
192
        authority,
193
        negotiationLogger,
194
        ticker);
195
  }
196

197
  @VisibleForTesting
198
  static NettyClientHandler newHandler(
199
      final Http2Connection connection,
200
      Http2FrameReader frameReader,
201
      Http2FrameWriter frameWriter,
202
      ClientTransportLifecycleManager lifecycleManager,
203
      KeepAliveManager keepAliveManager,
204
      boolean autoFlowControl,
205
      int flowControlWindow,
206
      int maxHeaderListSize,
207
      int softLimitHeaderListSize,
208
      Supplier<Stopwatch> stopwatchFactory,
209
      Runnable tooManyPingsRunnable,
210
      TransportTracer transportTracer,
211
      Attributes eagAttributes,
212
      String authority,
213
      ChannelLogger negotiationLogger,
214
      Ticker ticker) {
215
    Preconditions.checkNotNull(connection, "connection");
1✔
216
    Preconditions.checkNotNull(frameReader, "frameReader");
1✔
217
    Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
1✔
218
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
1✔
219
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
220
    Preconditions.checkArgument(softLimitHeaderListSize > 0,
1✔
221
        "softLimitHeaderListSize must be positive");
222
    Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
223
    Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
224
    Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
225
    Preconditions.checkNotNull(authority, "authority");
1✔
226

227
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
1✔
228
    frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
1✔
229
    frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
1✔
230

231
    PingCountingFrameWriter pingCounter;
232
    frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
1✔
233

234
    StreamBufferingEncoder encoder =
1✔
235
        new StreamBufferingEncoder(
236
            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
237

238
    // Create the local flow controller configured to auto-refill the connection window.
239
    connection.local().flowController(
1✔
240
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
241

242
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
243
        frameReader);
244

245
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection));
1✔
246

247
    Http2Settings settings = new Http2Settings();
1✔
248
    settings.pushEnabled(false);
1✔
249
    settings.initialWindowSize(flowControlWindow);
1✔
250
    settings.maxConcurrentStreams(0);
1✔
251
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
252

253
    return new NettyClientHandler(
1✔
254
        decoder,
255
        encoder,
256
        settings,
257
        negotiationLogger,
258
        lifecycleManager,
259
        keepAliveManager,
260
        stopwatchFactory,
261
        tooManyPingsRunnable,
262
        transportTracer,
263
        eagAttributes,
264
        authority,
265
        autoFlowControl,
266
        pingCounter,
267
        ticker,
268
        maxHeaderListSize,
269
        softLimitHeaderListSize);
270
  }
271

272
  private NettyClientHandler(
273
      Http2ConnectionDecoder decoder,
274
      Http2ConnectionEncoder encoder,
275
      Http2Settings settings,
276
      ChannelLogger negotiationLogger,
277
      ClientTransportLifecycleManager lifecycleManager,
278
      KeepAliveManager keepAliveManager,
279
      Supplier<Stopwatch> stopwatchFactory,
280
      final Runnable tooManyPingsRunnable,
281
      TransportTracer transportTracer,
282
      Attributes eagAttributes,
283
      String authority,
284
      boolean autoFlowControl,
285
      PingLimiter pingLimiter,
286
      Ticker ticker,
287
      int maxHeaderListSize,
288
      int softLimitHeaderListSize) {
289
    super(
1✔
290
        /* channelUnused= */ null,
291
        decoder,
292
        encoder,
293
        settings,
294
        negotiationLogger,
295
        autoFlowControl,
296
        pingLimiter,
297
        ticker,
298
        maxHeaderListSize,
299
        softLimitHeaderListSize);
300
    this.lifecycleManager = lifecycleManager;
1✔
301
    this.keepAliveManager = keepAliveManager;
1✔
302
    this.stopwatchFactory = stopwatchFactory;
1✔
303
    this.transportTracer = Preconditions.checkNotNull(transportTracer);
1✔
304
    this.eagAttributes = eagAttributes;
1✔
305
    this.authority = authority;
1✔
306
    this.attributes = Attributes.newBuilder()
1✔
307
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
1✔
308

309
    // Set the frame listener on the decoder.
310
    decoder().frameListener(new FrameListener());
1✔
311

312
    Http2Connection connection = encoder.connection();
1✔
313
    streamKey = connection.newKey();
1✔
314

315
    connection.addListener(new Http2ConnectionAdapter() {
1✔
316
      @Override
317
      public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
318
        byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
1✔
319
        goingAway(errorCode, debugDataBytes);
1✔
320
        if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
1✔
321
          String data = new String(debugDataBytes, UTF_8);
1✔
322
          logger.log(
1✔
323
              Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data);
324
          if ("too_many_pings".equals(data)) {
1✔
325
            tooManyPingsRunnable.run();
1✔
326
          }
327
        }
328
      }
1✔
329

330
      @Override
331
      public void onStreamActive(Http2Stream stream) {
332
        if (connection().numActiveStreams() == 1
1✔
333
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
334
          NettyClientHandler.this.keepAliveManager.onTransportActive();
1✔
335
        }
336
      }
1✔
337

338
      @Override
339
      public void onStreamClosed(Http2Stream stream) {
340
        // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
341
        // the first place, we don't propagate that option here, and it's safe to reset the in-use
342
        // state for them, which will be a cheap no-op.
343
        inUseState.updateObjectInUse(stream, false);
1✔
344
        if (connection().numActiveStreams() == 0
1✔
345
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
346
          NettyClientHandler.this.keepAliveManager.onTransportIdle();
1✔
347
        }
348
      }
1✔
349
    });
350
  }
1✔
351

352
  /**
353
   * The protocol negotiation attributes, available once the protocol negotiation completes;
354
   * otherwise returns {@code Attributes.EMPTY}.
355
   */
356
  Attributes getAttributes() {
357
    return attributes;
1✔
358
  }
359

360
  /**
361
   * Handler for commands sent from the stream.
362
   */
363
  @Override
364
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
365
          throws Exception {
366
    if (msg instanceof CreateStreamCommand) {
1✔
367
      createStream((CreateStreamCommand) msg, promise);
1✔
368
    } else if (msg instanceof SendGrpcFrameCommand) {
1✔
369
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
370
    } else if (msg instanceof CancelClientStreamCommand) {
1✔
371
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
1✔
372
    } else if (msg instanceof SendPingCommand) {
1✔
373
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
1✔
374
    } else if (msg instanceof GracefulCloseCommand) {
1✔
375
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
1✔
376
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
377
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
378
    } else if (msg == NOOP_MESSAGE) {
1✔
379
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
1✔
380
    } else {
381
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
382
    }
383
  }
1✔
384

385
  void startWriteQueue(Channel channel) {
386
    clientWriteQueue = new WriteQueue(channel);
1✔
387
  }
1✔
388

389
  WriteQueue getWriteQueue() {
390
    return clientWriteQueue;
1✔
391
  }
392

393
  ClientTransportLifecycleManager getLifecycleManager() {
394
    return lifecycleManager;
1✔
395
  }
396

397
  /**
398
   * Returns the given processed bytes back to inbound flow control.
399
   */
400
  void returnProcessedBytes(Http2Stream stream, int bytes) {
401
    try {
402
      decoder().flowController().consumeBytes(stream, bytes);
1✔
403
    } catch (Http2Exception e) {
×
404
      throw new RuntimeException(e);
×
405
    }
1✔
406
  }
1✔
407

408
  private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
409
    // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
410
    if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
1✔
411
      NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
412
      PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
1✔
413
      // check metadata size vs soft limit
414
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
415
      boolean shouldFail =
1✔
416
          Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
417
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize);
418
      if (shouldFail && endStream) {
1✔
419
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
×
420
            .withDescription(
×
421
                String.format(
×
422
                    "Server Status + Trailers of size %d exceeded Metadata size soft limit: %d",
423
                    h2HeadersSize,
×
424
                    softLimitHeaderListSize)), true, new Metadata());
×
425
        return;
×
426
      } else if (shouldFail) {
1✔
427
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
1✔
428
            .withDescription(
1✔
429
                String.format(
1✔
430
                    "Server Headers of size %d exceeded Metadata size soft limit: %d",
431
                    h2HeadersSize,
1✔
432
                    softLimitHeaderListSize)), true, new Metadata());
1✔
433
        return;
1✔
434
      }
435
      stream.transportHeadersReceived(headers, endStream);
1✔
436
    }
437

438
    if (keepAliveManager != null) {
1✔
439
      keepAliveManager.onDataReceived();
1✔
440
    }
441
  }
1✔
442

443
  /**
444
   * Handler for an inbound HTTP/2 DATA frame.
445
   */
446
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
447
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
448
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
449
    PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
1✔
450
    stream.transportDataReceived(data, endOfStream);
1✔
451
    if (keepAliveManager != null) {
1✔
452
      keepAliveManager.onDataReceived();
1✔
453
    }
454
  }
1✔
455

456
  /**
457
   * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
458
   */
459
  private void onRstStreamRead(int streamId, long errorCode) {
460
    NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
1✔
461
    if (stream != null) {
1✔
462
      PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
1✔
463
      Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
1✔
464
      stream.transportReportStatus(
1✔
465
          status,
466
          errorCode == Http2Error.REFUSED_STREAM.code()
1✔
467
              ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
468
          false /*stop delivery*/,
469
          new Metadata());
470
      if (keepAliveManager != null) {
1✔
471
        keepAliveManager.onDataReceived();
1✔
472
      }
473
    }
474
  }
1✔
475

476
  @Override
477
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
478
    logger.fine("Network channel being closed by the application.");
1✔
479
    if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
1✔
480
      lifecycleManager.notifyShutdown(
1✔
481
          Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"));
1✔
482
    }
483
    super.close(ctx, promise);
1✔
484
  }
1✔
485

486
  /**
487
   * Handler for the Channel shutting down.
488
   */
489
  @Override
490
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
491
    try {
492
      logger.fine("Network channel is closed");
1✔
493
      Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
1✔
494
      lifecycleManager.notifyShutdown(status);
1✔
495
      final Status streamStatus;
496
      if (channelInactiveReason != null) {
1✔
497
        streamStatus = channelInactiveReason;
1✔
498
      } else {
499
        streamStatus = lifecycleManager.getShutdownStatus();
1✔
500
      }
501
      try {
502
        cancelPing(lifecycleManager.getShutdownStatus());
1✔
503
        // Report status to the application layer for any open streams
504
        connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
505
          @Override
506
          public boolean visit(Http2Stream stream) throws Http2Exception {
507
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
508
            if (clientStream != null) {
1✔
509
              clientStream.transportReportStatus(streamStatus, false, new Metadata());
1✔
510
            }
511
            return true;
1✔
512
          }
513
        });
514
      } finally {
515
        lifecycleManager.notifyTerminated(status);
1✔
516
      }
517
    } finally {
518
      // Close any open streams
519
      super.channelInactive(ctx);
1✔
520
      if (keepAliveManager != null) {
1✔
521
        keepAliveManager.onTransportTermination();
1✔
522
      }
523
    }
524
  }
1✔
525

526
  @Override
527
  public void handleProtocolNegotiationCompleted(
528
      Attributes attributes, InternalChannelz.Security securityInfo) {
529
    this.attributes = this.attributes.toBuilder().setAll(attributes).build();
1✔
530
    this.securityInfo = securityInfo;
1✔
531
    super.handleProtocolNegotiationCompleted(attributes, securityInfo);
1✔
532
    writeBufferingAndRemove(ctx().channel());
1✔
533
  }
1✔
534

535
  static void writeBufferingAndRemove(Channel channel) {
536
    checkNotNull(channel, "channel");
1✔
537
    ChannelHandlerContext handlerCtx =
1✔
538
        channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
1✔
539
    if (handlerCtx == null) {
1✔
540
      return;
1✔
541
    }
542
    ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
1✔
543
  }
1✔
544

545
  @Override
546
  public Attributes getEagAttributes() {
547
    return eagAttributes;
1✔
548
  }
549

550
  @Override
551
  public String getAuthority() {
552
    return authority;
1✔
553
  }
554

555
  InternalChannelz.Security getSecurityInfo() {
556
    return securityInfo;
1✔
557
  }
558

559
  @Override
560
  protected void onConnectionError(ChannelHandlerContext ctx,  boolean outbound, Throwable cause,
561
      Http2Exception http2Ex) {
562
    logger.log(Level.FINE, "Caught a connection error", cause);
1✔
563
    lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
1✔
564
    // Parent class will shut down the Channel
565
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
566
  }
1✔
567

568
  @Override
569
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
570
      Http2Exception.StreamException http2Ex) {
571
    // Close the stream with a status that contains the cause.
572
    NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId()));
1✔
573
    if (stream != null) {
1✔
574
      stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata());
1✔
575
    } else {
576
      logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause);
1✔
577
    }
578

579
    // Delegate to the base class to send a RST_STREAM.
580
    super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
581
  }
1✔
582

583
  @Override
584
  protected boolean isGracefulShutdownComplete() {
585
    // Only allow graceful shutdown to complete after all pending streams have completed.
586
    return super.isGracefulShutdownComplete()
1✔
587
        && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
1✔
588
  }
589

590
  /**
591
   * Attempts to create a new stream from the given command. If there are too many active streams,
592
   * the creation request is queued.
593
   */
594
  private void createStream(CreateStreamCommand command, ChannelPromise promise)
595
          throws Exception {
596
    if (lifecycleManager.getShutdownStatus() != null) {
1✔
597
      command.stream().setNonExistent();
1✔
598
      // The connection is going away (it is really the GOAWAY case),
599
      // just terminate the stream now.
600
      command.stream().transportReportStatus(
1✔
601
          lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
1✔
602
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
603
              lifecycleManager.getShutdownStatus(), null));
1✔
604
      return;
1✔
605
    }
606

607
    CharSequence authorityHeader = command.headers().authority();
1✔
608
    if (authorityHeader == null) {
1✔
609
      Status authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
610
              "Missing authority header");
611
      command.stream().setNonExistent();
1✔
612
      command.stream().transportReportStatus(
1✔
613
              Status.UNAVAILABLE, RpcProgress.PROCESSED, true, new Metadata());
614
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
615
              authorityVerificationStatus, null));
616
      return;
1✔
617
    }
618
    // No need to verify authority for the rpc outgoing header if it is same as the authority
619
    // for the transport
620
    if (!authority.contentEquals(authorityHeader)) {
1✔
621
      Status authorityVerificationStatus = peerVerificationResults.get(
1✔
622
              authorityHeader.toString());
1✔
623
      if (authorityVerificationStatus == null) {
1✔
624
        if (attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) == null) {
1✔
625
          authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
626
                  "Authority verifier not found to verify authority");
627
          command.stream().setNonExistent();
1✔
628
          command.stream().transportReportStatus(
1✔
629
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
630
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
631
                  authorityVerificationStatus, null));
632
          return;
1✔
633
        }
634
        authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
1✔
635
                .verifyAuthority(authorityHeader.toString());
1✔
636
        peerVerificationResults.put(authorityHeader.toString(), authorityVerificationStatus);
1✔
637
        if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) {
1✔
638
          logger.log(Level.WARNING, String.format("%s.%s",
1✔
639
                          authorityVerificationStatus.getDescription(),
1✔
640
                          enablePerRpcAuthorityCheck
1✔
641
                                  ? "" : " This will be an error in the future."),
1✔
642
                  InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
643
                          authorityVerificationStatus, null));
644
        }
645
      }
646
      if (!authorityVerificationStatus.isOk()) {
1✔
647
        if (enablePerRpcAuthorityCheck) {
1✔
648
          command.stream().setNonExistent();
1✔
649
          command.stream().transportReportStatus(
1✔
650
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
651
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
652
                  authorityVerificationStatus, null));
653
          return;
1✔
654
        }
655
      }
656
    }
657
    // Get the stream ID for the new stream.
658
    int streamId;
659
    try {
660
      streamId = incrementAndGetNextStreamId();
1✔
661
    } catch (StatusException e) {
1✔
662
      command.stream().setNonExistent();
1✔
663
      // Stream IDs have been exhausted for this connection. Fail the promise immediately.
664
      promise.setFailure(e);
1✔
665

666
      // Initiate a graceful shutdown if we haven't already.
667
      if (!connection().goAwaySent()) {
1✔
668
        logger.fine("Stream IDs have been exhausted for this connection. "
1✔
669
                + "Initiating graceful shutdown of the connection.");
670
        lifecycleManager.notifyShutdown(e.getStatus());
1✔
671
        close(ctx(), ctx().newPromise());
1✔
672
      }
673
      return;
1✔
674
    }
1✔
675
    if (connection().goAwayReceived()) {
1✔
676
      Status s = abruptGoAwayStatus;
1✔
677
      int maxActiveStreams = connection().local().maxActiveStreams();
1✔
678
      int lastStreamId = connection().local().lastStreamKnownByPeer();
1✔
679
      if (s == null) {
1✔
680
        // Should be impossible, but handle pseudo-gracefully
681
        s = Status.INTERNAL.withDescription(
×
682
            "Failed due to abrupt GOAWAY, but can't find GOAWAY details");
683
      } else if (streamId > lastStreamId) {
1✔
684
        s = s.augmentDescription(
1✔
685
            "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId);
686
      } else if (connection().local().numActiveStreams() == maxActiveStreams) {
1✔
687
        s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams);
1✔
688
      }
689
      if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) {
1✔
690
        // This should only be reachable during onGoAwayReceived, as otherwise
691
        // getShutdownThrowable() != null
692
        command.stream().setNonExistent();
1✔
693
        command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
1✔
694
        promise.setFailure(s.asRuntimeException());
1✔
695
        return;
1✔
696
      }
697
    }
698

699
    NettyClientStream.TransportState stream = command.stream();
1✔
700
    Http2Headers headers = command.headers();
1✔
701
    stream.setId(streamId);
1✔
702

703
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
1✔
704
      PerfMark.linkIn(command.getLink());
1✔
705
      PerfMark.attachTag(stream.tag());
1✔
706
      createStreamTraced(
1✔
707
          streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
1✔
708
    }
709
  }
1✔
710

711
  private void createStreamTraced(
712
      final int streamId,
713
      final NettyClientStream.TransportState stream,
714
      final Http2Headers headers,
715
      boolean isGet,
716
      final boolean shouldBeCountedForInUse,
717
      final ChannelPromise promise) {
718
    // Create an intermediate promise so that we can intercept the failure reported back to the
719
    // application.
720
    ChannelPromise tempPromise = ctx().newPromise();
1✔
721
    encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
1✔
722
        .addListener(new ChannelFutureListener() {
1✔
723
          @Override
724
          public void operationComplete(ChannelFuture future) throws Exception {
725
            if (future.isSuccess()) {
1✔
726
              // The http2Stream will be null in case a stream buffered in the encoder
727
              // was canceled via RST_STREAM.
728
              Http2Stream http2Stream = connection().stream(streamId);
1✔
729
              if (http2Stream != null) {
1✔
730
                stream.getStatsTraceContext().clientOutboundHeaders();
1✔
731
                http2Stream.setProperty(streamKey, stream);
1✔
732

733
                // This delays the in-use state until the I/O completes, which technically may
734
                // be later than we would like.
735
                if (shouldBeCountedForInUse) {
1✔
736
                  inUseState.updateObjectInUse(http2Stream, true);
1✔
737
                }
738

739
                // Attach the client stream to the HTTP/2 stream object as user data.
740
                stream.setHttp2Stream(http2Stream);
1✔
741
              }
742
              // Otherwise, the stream has been cancelled and Netty is sending a
743
              // RST_STREAM frame which causes it to purge pending writes from the
744
              // flow-controller and delete the http2Stream. The stream listener has already
745
              // been notified of cancellation so there is nothing to do.
746

747
              // Just forward on the success status to the original promise.
748
              promise.setSuccess();
1✔
749
            } else {
1✔
750
              Throwable cause = future.cause();
1✔
751
              if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
1✔
752
                StreamBufferingEncoder.Http2GoAwayException e =
1✔
753
                    (StreamBufferingEncoder.Http2GoAwayException) cause;
754
                Status status = statusFromH2Error(
1✔
755
                    Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
756
                    e.errorCode(), e.debugData());
1✔
757
                cause = status.asRuntimeException();
1✔
758
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
759
              } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
1✔
760
                Status status = lifecycleManager.getShutdownStatus();
1✔
761
                if (status == null) {
1✔
762
                  status = Status.UNAVAILABLE.withCause(cause)
×
763
                      .withDescription("Connection closed while stream is buffered");
×
764
                }
765
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
766
              }
767
              promise.setFailure(cause);
1✔
768
            }
769
          }
1✔
770
        });
771
  }
1✔
772

773
  /**
774
   * Cancels this stream.
775
   */
776
  private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
777
      ChannelPromise promise) {
778
    NettyClientStream.TransportState stream = cmd.stream();
1✔
779
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
1✔
780
      PerfMark.attachTag(stream.tag());
1✔
781
      PerfMark.linkIn(cmd.getLink());
1✔
782
      Status reason = cmd.reason();
1✔
783
      if (reason != null) {
1✔
784
        stream.transportReportStatus(reason, true, new Metadata());
1✔
785
      }
786
      if (!cmd.stream().isNonExistent()) {
1✔
787
        encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
1✔
788
      } else {
789
        promise.setSuccess();
1✔
790
      }
791
    }
792
  }
1✔
793

794
  /**
795
   * Sends the given GRPC frame for the stream.
796
   */
797
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
798
      ChannelPromise promise) {
799
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
1✔
800
      PerfMark.attachTag(cmd.stream().tag());
1✔
801
      PerfMark.linkIn(cmd.getLink());
1✔
802
      // Call the base class to write the HTTP/2 DATA frame.
803
      // Note: no need to flush since this is handled by the outbound flow controller.
804
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
805
    }
806
  }
1✔
807

808
  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
809
      ChannelPromise promise) {
810
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
1✔
811
      PerfMark.linkIn(msg.getLink());
1✔
812
      sendPingFrameTraced(ctx, msg, promise);
1✔
813
    }
814
  }
1✔
815

816
  /**
817
   * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
818
   * registered to be called when the existing operation completes, and no new frame is sent.
819
   */
820
  private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
821
      ChannelPromise promise) {
822
    // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
823
    // but before termination. After termination, messages will no longer arrive because the
824
    // pipeline clears all handlers on channel close.
825

826
    PingCallback callback = msg.callback();
1✔
827
    Executor executor = msg.executor();
1✔
828
    // we only allow one outstanding ping at a time, so just add the callback to
829
    // any outstanding operation
830
    if (ping != null) {
1✔
831
      promise.setSuccess();
1✔
832
      ping.addCallback(callback, executor);
1✔
833
      return;
1✔
834
    }
835

836
    // Use a new promise to prevent calling the callback twice on write failure: here and in
837
    // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if
838
    // ping != null above.
839
    promise.setSuccess();
1✔
840
    promise = ctx().newPromise();
1✔
841
    // set outstanding operation
842
    long data = USER_PING_PAYLOAD;
1✔
843
    Stopwatch stopwatch = stopwatchFactory.get();
1✔
844
    stopwatch.start();
1✔
845
    ping = new Http2Ping(data, stopwatch);
1✔
846
    ping.addCallback(callback, executor);
1✔
847
    // and then write the ping
848
    encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise);
1✔
849
    ctx.flush();
1✔
850
    final Http2Ping finalPing = ping;
1✔
851
    promise.addListener(new ChannelFutureListener() {
1✔
852
      @Override
853
      public void operationComplete(ChannelFuture future) throws Exception {
854
        if (future.isSuccess()) {
1✔
855
          transportTracer.reportKeepAliveSent();
1✔
856
          return;
1✔
857
        }
858
        Throwable cause = future.cause();
×
859
        Status status = lifecycleManager.getShutdownStatus();
×
860
        if (cause instanceof ClosedChannelException && status == null) {
×
861
            status = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
×
862
                .withCause(future.cause());
×
863
        } else {
864
          status = Utils.statusFromThrowable(cause);
×
865
        }
866
        finalPing.failed(status);
×
867
        if (ping == finalPing) {
×
868
          ping = null;
×
869
        }
870
      }
×
871
    });
872
  }
1✔
873

874
  private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
875
      ChannelPromise promise) throws Exception {
876
    lifecycleManager.notifyShutdown(msg.getStatus());
1✔
877
    // Explicitly flush to create any buffered streams before sending GOAWAY.
878
    // TODO(ejona): determine if the need to flush is a bug in Netty
879
    flush(ctx);
1✔
880
    close(ctx, promise);
1✔
881
  }
1✔
882

883
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
884
      ChannelPromise promise) throws Exception {
885
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
886
      @Override
887
      public boolean visit(Http2Stream stream) throws Http2Exception {
888
        NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
889
        Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
1✔
890
        try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
1✔
891
          PerfMark.linkIn(msg.getLink());
1✔
892
          PerfMark.attachTag(tag);
1✔
893
          if (clientStream != null) {
1✔
894
            clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
1✔
895
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
896
          }
897
          stream.close();
1✔
898
          return true;
1✔
899
        }
900
      }
901
    });
902
    close(ctx, promise);
1✔
903
  }
1✔
904

905
  /**
906
   * Handler for a GOAWAY being received. Fails any streams created after the
907
   * last known stream. May only be called during a read.
908
   */
909
  private void goingAway(long errorCode, byte[] debugData) {
910
    Status finalStatus = statusFromH2Error(
1✔
911
        Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
912
    lifecycleManager.notifyGracefulShutdown(finalStatus);
1✔
913
    abruptGoAwayStatus = statusFromH2Error(
1✔
914
        Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
915
    // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
916
    // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
917
    // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
918
    // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
919
    // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
920
    // UNAVAILABLE. https://github.com/netty/netty/issues/10670
921
    final Status abruptGoAwayStatusConservative = statusFromH2Error(
1✔
922
        null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
923
    final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code();
1✔
924
    // Try to allocate as many in-flight streams as possible, to reduce race window of
925
    // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
926
    // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
927
    // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
928
    // processed and thus this processing must be in-line before processing additional reads.
929

930
    // This can cause reentrancy, but should be minor since it is normal to handle writes in
931
    // response to a read. Also, the call stack is rather shallow at this point
932
    clientWriteQueue.drainNow();
1✔
933
    if (lifecycleManager.notifyShutdown(finalStatus)) {
1✔
934
      // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
935
      // RPCs were not observed by the remote and so should be UNAVAILABLE.
936
      channelInactiveReason = statusFromH2Error(
1✔
937
          null, "Connection closed after GOAWAY", errorCode, debugData);
938
    }
939

940
    final int lastKnownStream = connection().local().lastStreamKnownByPeer();
1✔
941
    try {
942
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
943
        @Override
944
        public boolean visit(Http2Stream stream) throws Http2Exception {
945
          if (stream.id() > lastKnownStream) {
1✔
946
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
947
            if (clientStream != null) {
1✔
948
              // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
949
              // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
950
              // retries, but only if something else caused a connection failure.
951
              RpcProgress progress = mayBeHittingNettyBug
1✔
952
                  ? RpcProgress.PROCESSED
1✔
953
                  : RpcProgress.REFUSED;
1✔
954
              clientStream.transportReportStatus(
1✔
955
                  abruptGoAwayStatusConservative, progress, false, new Metadata());
956
            }
957
            stream.close();
1✔
958
          }
959
          return true;
1✔
960
        }
961
      });
962
    } catch (Http2Exception e) {
×
963
      throw new RuntimeException(e);
×
964
    }
1✔
965
  }
1✔
966

967
  private void cancelPing(Status s) {
968
    if (ping != null) {
1✔
969
      ping.failed(s);
1✔
970
      ping = null;
1✔
971
    }
972
  }
1✔
973

974
  /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
975
  private Status statusFromH2Error(
976
      Status.Code statusCode, String context, long errorCode, byte[] debugData) {
977
    Status status = GrpcUtil.Http2Error.statusForCode(errorCode);
1✔
978
    if (statusCode == null) {
1✔
979
      statusCode = status.getCode();
1✔
980
    }
981
    String debugString = "";
1✔
982
    if (debugData != null && debugData.length > 0) {
1✔
983
      // If a debug message was provided, use it.
984
      debugString = ", debug data: " + new String(debugData, UTF_8);
1✔
985
    }
986
    return statusCode.toStatus()
1✔
987
        .withDescription(context + ". " + status.getDescription() + debugString);
1✔
988
  }
989

990
  /**
991
   * Gets the client stream associated to the given HTTP/2 stream object.
992
   */
993
  private NettyClientStream.TransportState clientStream(Http2Stream stream) {
994
    return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey);
1✔
995
  }
996

997
  private int incrementAndGetNextStreamId() throws StatusException {
998
    int nextStreamId = connection().local().incrementAndGetNextStreamId();
1✔
999
    if (nextStreamId < 0) {
1✔
1000
      logger.fine("Stream IDs have been exhausted for this connection. "
1✔
1001
              + "Initiating graceful shutdown of the connection.");
1002
      throw EXHAUSTED_STREAMS_STATUS.asException();
1✔
1003
    }
1004
    return nextStreamId;
1✔
1005
  }
1006

1007
  private Http2Stream requireHttp2Stream(int streamId) {
1008
    Http2Stream stream = connection().stream(streamId);
1✔
1009
    if (stream == null) {
1✔
1010
      // This should never happen.
1011
      throw new AssertionError("Stream does not exist: " + streamId);
×
1012
    }
1013
    return stream;
1✔
1014
  }
1015

1016
  private class FrameListener extends Http2FrameAdapter {
1✔
1017
    private boolean firstSettings = true;
1✔
1018

1019
    @Override
1020
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
1021
      if (firstSettings) {
1✔
1022
        firstSettings = false;
1✔
1023
        attributes = lifecycleManager.filterAttributes(attributes);
1✔
1024
        lifecycleManager.notifyReady();
1✔
1025
      }
1026
    }
1✔
1027

1028
    @Override
1029
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
1030
        boolean endOfStream) throws Http2Exception {
1031
      NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
1032
      return padding;
1✔
1033
    }
1034

1035
    @Override
1036
    public void onHeadersRead(ChannelHandlerContext ctx,
1037
        int streamId,
1038
        Http2Headers headers,
1039
        int streamDependency,
1040
        short weight,
1041
        boolean exclusive,
1042
        int padding,
1043
        boolean endStream) throws Http2Exception {
1044
      NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
1✔
1045
    }
1✔
1046

1047
    @Override
1048
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
1049
        throws Http2Exception {
1050
      NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
1✔
1051
    }
1✔
1052

1053
    @Override
1054
    public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception {
1055
      Http2Ping p = ping;
1✔
1056
      if (ackPayload == flowControlPing().payload()) {
1✔
1057
        flowControlPing().updateWindow();
1✔
1058
        logger.log(Level.FINE, "Window: {0}",
1✔
1059
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1060
      } else if (p != null) {
1✔
1061
        if (p.payload() == ackPayload) {
1✔
1062
          p.complete();
1✔
1063
          ping = null;
1✔
1064
        } else {
1065
          logger.log(Level.WARNING,
1✔
1066
              "Received unexpected ping ack. Expecting {0}, got {1}",
1067
              new Object[] {p.payload(), ackPayload});
1✔
1068
        }
1069
      } else {
1070
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1071
      }
1072
      if (keepAliveManager != null) {
1✔
1073
        keepAliveManager.onDataReceived();
1✔
1074
      }
1075
    }
1✔
1076

1077
    @Override
1078
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
1079
      if (keepAliveManager != null) {
1✔
1080
        keepAliveManager.onDataReceived();
×
1081
      }
1082
    }
1✔
1083
  }
1084

1085
  private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
1086
      implements AbstractNettyHandler.PingLimiter {
1087
    private int pingCount;
1088

1089
    public PingCountingFrameWriter(Http2FrameWriter delegate) {
1090
      super(delegate);
1✔
1091
    }
1✔
1092

1093
    @Override
1094
    public boolean isPingAllowed() {
1095
      // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
1096
      return pingCount < 2;
1✔
1097
    }
1098

1099
    @Override
1100
    public ChannelFuture writeHeaders(
1101
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1102
        int padding, boolean endStream, ChannelPromise promise) {
1103
      pingCount = 0;
×
1104
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
×
1105
    }
1106

1107
    @Override
1108
    public ChannelFuture writeHeaders(
1109
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1110
        int streamDependency, short weight, boolean exclusive,
1111
        int padding, boolean endStream, ChannelPromise promise) {
1112
      pingCount = 0;
1✔
1113
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
1✔
1114
          padding, endStream, promise);
1115
    }
1116

1117
    @Override
1118
    public ChannelFuture writeWindowUpdate(
1119
        ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
1120
      pingCount = 0;
1✔
1121
      return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
1✔
1122
    }
1123

1124
    @Override
1125
    public ChannelFuture writePing(
1126
        ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
1127
      if (!ack) {
1✔
1128
        pingCount++;
1✔
1129
      }
1130
      return super.writePing(ctx, ack, data, promise);
1✔
1131
    }
1132

1133
    @Override
1134
    public ChannelFuture writeData(
1135
        ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
1136
        ChannelPromise promise) {
1137
      if (data.isReadable()) {
1✔
1138
        pingCount = 0;
1✔
1139
      }
1140
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1141
    }
1142
  }
1143
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc