• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19711

24 Feb 2025 02:58PM UTC coverage: 88.545% (-0.08%) from 88.622%
#19711

push

github

web-flow
netty: Per-rpc authority verification against peer cert subject names (#11724)

Per-rpc verification of authority specified via call options or set by the LB API against peer cert's subject names.

34437 of 38892 relevant lines covered (88.55%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.87
/../netty/src/main/java/io/grpc/netty/NettyClientHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
20
import static io.netty.util.CharsetUtil.UTF_8;
21
import static io.netty.util.internal.ObjectUtil.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Stopwatch;
26
import com.google.common.base.Supplier;
27
import com.google.common.base.Ticker;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.InternalChannelz;
31
import io.grpc.InternalStatus;
32
import io.grpc.Metadata;
33
import io.grpc.Status;
34
import io.grpc.StatusException;
35
import io.grpc.internal.ClientStreamListener.RpcProgress;
36
import io.grpc.internal.ClientTransport.PingCallback;
37
import io.grpc.internal.GrpcAttributes;
38
import io.grpc.internal.GrpcUtil;
39
import io.grpc.internal.Http2Ping;
40
import io.grpc.internal.InUseStateAggregator;
41
import io.grpc.internal.KeepAliveManager;
42
import io.grpc.internal.TransportTracer;
43
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
44
import io.netty.buffer.ByteBuf;
45
import io.netty.buffer.ByteBufUtil;
46
import io.netty.buffer.Unpooled;
47
import io.netty.channel.Channel;
48
import io.netty.channel.ChannelFuture;
49
import io.netty.channel.ChannelFutureListener;
50
import io.netty.channel.ChannelHandlerContext;
51
import io.netty.channel.ChannelPromise;
52
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
53
import io.netty.handler.codec.http2.DefaultHttp2Connection;
54
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
55
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
56
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
57
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
58
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
59
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
60
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
61
import io.netty.handler.codec.http2.Http2CodecUtil;
62
import io.netty.handler.codec.http2.Http2Connection;
63
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
64
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
65
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
66
import io.netty.handler.codec.http2.Http2Error;
67
import io.netty.handler.codec.http2.Http2Exception;
68
import io.netty.handler.codec.http2.Http2FrameAdapter;
69
import io.netty.handler.codec.http2.Http2FrameLogger;
70
import io.netty.handler.codec.http2.Http2FrameReader;
71
import io.netty.handler.codec.http2.Http2FrameWriter;
72
import io.netty.handler.codec.http2.Http2Headers;
73
import io.netty.handler.codec.http2.Http2HeadersDecoder;
74
import io.netty.handler.codec.http2.Http2HeadersEncoder;
75
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
76
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
77
import io.netty.handler.codec.http2.Http2Settings;
78
import io.netty.handler.codec.http2.Http2Stream;
79
import io.netty.handler.codec.http2.Http2StreamVisitor;
80
import io.netty.handler.codec.http2.StreamBufferingEncoder;
81
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
82
import io.netty.handler.logging.LogLevel;
83
import io.perfmark.PerfMark;
84
import io.perfmark.Tag;
85
import io.perfmark.TaskCloseable;
86
import java.nio.channels.ClosedChannelException;
87
import java.util.LinkedHashMap;
88
import java.util.Map;
89
import java.util.concurrent.Executor;
90
import java.util.logging.Level;
91
import java.util.logging.Logger;
92
import javax.annotation.Nullable;
93

94
/**
95
 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
96
 * the context of the Netty Channel thread.
97
 */
98
class NettyClientHandler extends AbstractNettyHandler {
99
  private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
1✔
100
  static boolean enablePerRpcAuthorityCheck =
1✔
101
      GrpcUtil.getFlag("GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK", false);
1✔
102

103
  /**
104
   * A message that simply passes through the channel without any real processing. It is useful to
105
   * check if buffers have been drained and test the health of the channel in a single operation.
106
   */
107
  static final Object NOOP_MESSAGE = new Object();
1✔
108

109
  /**
110
   * Status used when the transport has exhausted the number of streams.
111
   */
112
  private static final Status EXHAUSTED_STREAMS_STATUS =
1✔
113
          Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");
1✔
114
  private static final long USER_PING_PAYLOAD = 1111;
115

116
  private final Http2Connection.PropertyKey streamKey;
117
  private final ClientTransportLifecycleManager lifecycleManager;
118
  private final KeepAliveManager keepAliveManager;
119
  // Returns new unstarted stopwatches
120
  private final Supplier<Stopwatch> stopwatchFactory;
121
  private final TransportTracer transportTracer;
122
  private final Attributes eagAttributes;
123
  private final String authority;
124
  private final InUseStateAggregator<Http2Stream> inUseState =
1✔
125
      new InUseStateAggregator<Http2Stream>() {
1✔
126
        @Override
127
        protected void handleInUse() {
128
          lifecycleManager.notifyInUse(true);
1✔
129
        }
1✔
130

131
        @Override
132
        protected void handleNotInUse() {
133
          lifecycleManager.notifyInUse(false);
1✔
134
        }
1✔
135
      };
136
  private final Map<String, Status> peerVerificationResults =
1✔
137
      new LinkedHashMap<String, Status>() {
1✔
138
        @Override
139
        protected boolean removeEldestEntry(Map.Entry<String, Status> eldest) {
140
          return size() > 100;
1✔
141
        }
142
      };
143

144
  private WriteQueue clientWriteQueue;
145
  private Http2Ping ping;
146
  private Attributes attributes;
147
  private InternalChannelz.Security securityInfo;
148
  private Status abruptGoAwayStatus;
149
  private Status channelInactiveReason;
150

151
  static NettyClientHandler newHandler(
152
      ClientTransportLifecycleManager lifecycleManager,
153
      @Nullable KeepAliveManager keepAliveManager,
154
      boolean autoFlowControl,
155
      int flowControlWindow,
156
      int maxHeaderListSize,
157
      int softLimitHeaderListSize,
158
      Supplier<Stopwatch> stopwatchFactory,
159
      Runnable tooManyPingsRunnable,
160
      TransportTracer transportTracer,
161
      Attributes eagAttributes,
162
      String authority,
163
      ChannelLogger negotiationLogger,
164
      Ticker ticker) {
165
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
166
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
1✔
167
    Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
1✔
168
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
169
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
170
    Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(encoder);
1✔
171
    Http2Connection connection = new DefaultHttp2Connection(false);
1✔
172
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
173
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
174
    DefaultHttp2RemoteFlowController controller =
1✔
175
        new DefaultHttp2RemoteFlowController(connection, dist);
176
    connection.remote().flowController(controller);
1✔
177

178
    return newHandler(
1✔
179
        connection,
180
        frameReader,
181
        frameWriter,
182
        lifecycleManager,
183
        keepAliveManager,
184
        autoFlowControl,
185
        flowControlWindow,
186
        maxHeaderListSize,
187
        softLimitHeaderListSize,
188
        stopwatchFactory,
189
        tooManyPingsRunnable,
190
        transportTracer,
191
        eagAttributes,
192
        authority,
193
        negotiationLogger,
194
        ticker);
195
  }
196

197
  @VisibleForTesting
198
  static NettyClientHandler newHandler(
199
      final Http2Connection connection,
200
      Http2FrameReader frameReader,
201
      Http2FrameWriter frameWriter,
202
      ClientTransportLifecycleManager lifecycleManager,
203
      KeepAliveManager keepAliveManager,
204
      boolean autoFlowControl,
205
      int flowControlWindow,
206
      int maxHeaderListSize,
207
      int softLimitHeaderListSize,
208
      Supplier<Stopwatch> stopwatchFactory,
209
      Runnable tooManyPingsRunnable,
210
      TransportTracer transportTracer,
211
      Attributes eagAttributes,
212
      String authority,
213
      ChannelLogger negotiationLogger,
214
      Ticker ticker) {
215
    Preconditions.checkNotNull(connection, "connection");
1✔
216
    Preconditions.checkNotNull(frameReader, "frameReader");
1✔
217
    Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
1✔
218
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
1✔
219
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
220
    Preconditions.checkArgument(softLimitHeaderListSize > 0,
1✔
221
        "softLimitHeaderListSize must be positive");
222
    Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
223
    Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
224
    Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
225
    Preconditions.checkNotNull(authority, "authority");
1✔
226

227
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
1✔
228
    frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
1✔
229
    frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
1✔
230

231
    PingCountingFrameWriter pingCounter;
232
    frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
1✔
233

234
    StreamBufferingEncoder encoder =
1✔
235
        new StreamBufferingEncoder(
236
            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
237

238
    // Create the local flow controller configured to auto-refill the connection window.
239
    connection.local().flowController(
1✔
240
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
241

242
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
243
        frameReader);
244

245
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection));
1✔
246

247
    Http2Settings settings = new Http2Settings();
1✔
248
    settings.pushEnabled(false);
1✔
249
    settings.initialWindowSize(flowControlWindow);
1✔
250
    settings.maxConcurrentStreams(0);
1✔
251
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
252

253
    return new NettyClientHandler(
1✔
254
        decoder,
255
        encoder,
256
        settings,
257
        negotiationLogger,
258
        lifecycleManager,
259
        keepAliveManager,
260
        stopwatchFactory,
261
        tooManyPingsRunnable,
262
        transportTracer,
263
        eagAttributes,
264
        authority,
265
        autoFlowControl,
266
        pingCounter,
267
        ticker,
268
        maxHeaderListSize,
269
        softLimitHeaderListSize);
270
  }
271

272
  private NettyClientHandler(
273
      Http2ConnectionDecoder decoder,
274
      Http2ConnectionEncoder encoder,
275
      Http2Settings settings,
276
      ChannelLogger negotiationLogger,
277
      ClientTransportLifecycleManager lifecycleManager,
278
      KeepAliveManager keepAliveManager,
279
      Supplier<Stopwatch> stopwatchFactory,
280
      final Runnable tooManyPingsRunnable,
281
      TransportTracer transportTracer,
282
      Attributes eagAttributes,
283
      String authority,
284
      boolean autoFlowControl,
285
      PingLimiter pingLimiter,
286
      Ticker ticker,
287
      int maxHeaderListSize,
288
      int softLimitHeaderListSize) {
289
    super(
1✔
290
        /* channelUnused= */ null,
291
        decoder,
292
        encoder,
293
        settings,
294
        negotiationLogger,
295
        autoFlowControl,
296
        pingLimiter,
297
        ticker,
298
        maxHeaderListSize,
299
        softLimitHeaderListSize);
300
    this.lifecycleManager = lifecycleManager;
1✔
301
    this.keepAliveManager = keepAliveManager;
1✔
302
    this.stopwatchFactory = stopwatchFactory;
1✔
303
    this.transportTracer = Preconditions.checkNotNull(transportTracer);
1✔
304
    this.eagAttributes = eagAttributes;
1✔
305
    this.authority = authority;
1✔
306
    this.attributes = Attributes.newBuilder()
1✔
307
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
1✔
308

309
    // Set the frame listener on the decoder.
310
    decoder().frameListener(new FrameListener());
1✔
311

312
    Http2Connection connection = encoder.connection();
1✔
313
    streamKey = connection.newKey();
1✔
314

315
    connection.addListener(new Http2ConnectionAdapter() {
1✔
316
      @Override
317
      public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
318
        byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
1✔
319
        goingAway(errorCode, debugDataBytes);
1✔
320
        if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
1✔
321
          String data = new String(debugDataBytes, UTF_8);
1✔
322
          logger.log(
1✔
323
              Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data);
324
          if ("too_many_pings".equals(data)) {
1✔
325
            tooManyPingsRunnable.run();
1✔
326
          }
327
        }
328
      }
1✔
329

330
      @Override
331
      public void onStreamActive(Http2Stream stream) {
332
        if (connection().numActiveStreams() == 1
1✔
333
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
334
          NettyClientHandler.this.keepAliveManager.onTransportActive();
1✔
335
        }
336
      }
1✔
337

338
      @Override
339
      public void onStreamClosed(Http2Stream stream) {
340
        // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
341
        // the first place, we don't propagate that option here, and it's safe to reset the in-use
342
        // state for them, which will be a cheap no-op.
343
        inUseState.updateObjectInUse(stream, false);
1✔
344
        if (connection().numActiveStreams() == 0
1✔
345
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
346
          NettyClientHandler.this.keepAliveManager.onTransportIdle();
1✔
347
        }
348
      }
1✔
349
    });
350
  }
1✔
351

352
  /**
353
   * The protocol negotiation attributes, available once the protocol negotiation completes;
354
   * otherwise returns {@code Attributes.EMPTY}.
355
   */
356
  Attributes getAttributes() {
357
    return attributes;
1✔
358
  }
359

360
  /**
361
   * Handler for commands sent from the stream.
362
   */
363
  @Override
364
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
365
          throws Exception {
366
    if (msg instanceof CreateStreamCommand) {
1✔
367
      createStream((CreateStreamCommand) msg, promise);
1✔
368
    } else if (msg instanceof SendGrpcFrameCommand) {
1✔
369
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
370
    } else if (msg instanceof CancelClientStreamCommand) {
1✔
371
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
1✔
372
    } else if (msg instanceof SendPingCommand) {
1✔
373
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
1✔
374
    } else if (msg instanceof GracefulCloseCommand) {
1✔
375
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
1✔
376
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
377
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
378
    } else if (msg == NOOP_MESSAGE) {
1✔
379
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
1✔
380
    } else {
381
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
382
    }
383
  }
1✔
384

385
  void startWriteQueue(Channel channel) {
386
    clientWriteQueue = new WriteQueue(channel);
1✔
387
  }
1✔
388

389
  WriteQueue getWriteQueue() {
390
    return clientWriteQueue;
1✔
391
  }
392

393
  ClientTransportLifecycleManager getLifecycleManager() {
394
    return lifecycleManager;
1✔
395
  }
396

397
  /**
398
   * Returns the given processed bytes back to inbound flow control.
399
   */
400
  void returnProcessedBytes(Http2Stream stream, int bytes) {
401
    try {
402
      decoder().flowController().consumeBytes(stream, bytes);
1✔
403
    } catch (Http2Exception e) {
×
404
      throw new RuntimeException(e);
×
405
    }
1✔
406
  }
1✔
407

408
  private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
409
    // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
410
    if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
1✔
411
      NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
412
      PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
1✔
413
      // check metadata size vs soft limit
414
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
415
      boolean shouldFail =
1✔
416
          Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
417
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize);
418
      if (shouldFail && endStream) {
1✔
419
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
×
420
            .withDescription(
×
421
                String.format(
×
422
                    "Server Status + Trailers of size %d exceeded Metadata size soft limit: %d",
423
                    h2HeadersSize,
×
424
                    softLimitHeaderListSize)), true, new Metadata());
×
425
        return;
×
426
      } else if (shouldFail) {
1✔
427
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
1✔
428
            .withDescription(
1✔
429
                String.format(
1✔
430
                    "Server Headers of size %d exceeded Metadata size soft limit: %d",
431
                    h2HeadersSize,
1✔
432
                    softLimitHeaderListSize)), true, new Metadata());
1✔
433
        return;
1✔
434
      }
435
      stream.transportHeadersReceived(headers, endStream);
1✔
436
    }
437

438
    if (keepAliveManager != null) {
1✔
439
      keepAliveManager.onDataReceived();
1✔
440
    }
441
  }
1✔
442

443
  /**
444
   * Handler for an inbound HTTP/2 DATA frame.
445
   */
446
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
447
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
448
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
449
    PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
1✔
450
    stream.transportDataReceived(data, endOfStream);
1✔
451
    if (keepAliveManager != null) {
1✔
452
      keepAliveManager.onDataReceived();
1✔
453
    }
454
  }
1✔
455

456
  /**
457
   * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
458
   */
459
  private void onRstStreamRead(int streamId, long errorCode) {
460
    NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
1✔
461
    if (stream != null) {
1✔
462
      PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
1✔
463
      Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
1✔
464
      stream.transportReportStatus(
1✔
465
          status,
466
          errorCode == Http2Error.REFUSED_STREAM.code()
1✔
467
              ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
468
          false /*stop delivery*/,
469
          new Metadata());
470
      if (keepAliveManager != null) {
1✔
471
        keepAliveManager.onDataReceived();
1✔
472
      }
473
    }
474
  }
1✔
475

476
  @Override
477
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
478
    logger.fine("Network channel being closed by the application.");
1✔
479
    if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
1✔
480
      lifecycleManager.notifyShutdown(
1✔
481
          Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"));
1✔
482
    }
483
    super.close(ctx, promise);
1✔
484
  }
1✔
485

486
  /**
487
   * Handler for the Channel shutting down.
488
   */
489
  @Override
490
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
491
    try {
492
      logger.fine("Network channel is closed");
1✔
493
      Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
1✔
494
      lifecycleManager.notifyShutdown(status);
1✔
495
      final Status streamStatus;
496
      if (channelInactiveReason != null) {
1✔
497
        streamStatus = channelInactiveReason;
1✔
498
      } else {
499
        streamStatus = lifecycleManager.getShutdownStatus();
1✔
500
      }
501
      try {
502
        cancelPing(lifecycleManager.getShutdownThrowable());
1✔
503
        // Report status to the application layer for any open streams
504
        connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
505
          @Override
506
          public boolean visit(Http2Stream stream) throws Http2Exception {
507
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
508
            if (clientStream != null) {
1✔
509
              clientStream.transportReportStatus(streamStatus, false, new Metadata());
1✔
510
            }
511
            return true;
1✔
512
          }
513
        });
514
      } finally {
515
        lifecycleManager.notifyTerminated(status);
1✔
516
      }
517
    } finally {
518
      // Close any open streams
519
      super.channelInactive(ctx);
1✔
520
      if (keepAliveManager != null) {
1✔
521
        keepAliveManager.onTransportTermination();
1✔
522
      }
523
    }
524
  }
1✔
525

526
  @Override
527
  public void handleProtocolNegotiationCompleted(
528
      Attributes attributes, InternalChannelz.Security securityInfo) {
529
    this.attributes = this.attributes.toBuilder().setAll(attributes).build();
1✔
530
    this.securityInfo = securityInfo;
1✔
531
    super.handleProtocolNegotiationCompleted(attributes, securityInfo);
1✔
532
    writeBufferingAndRemove(ctx().channel());
1✔
533
  }
1✔
534

535
  static void writeBufferingAndRemove(Channel channel) {
536
    checkNotNull(channel, "channel");
1✔
537
    ChannelHandlerContext handlerCtx =
1✔
538
        channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
1✔
539
    if (handlerCtx == null) {
1✔
540
      return;
1✔
541
    }
542
    ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
1✔
543
  }
1✔
544

545
  @Override
546
  public Attributes getEagAttributes() {
547
    return eagAttributes;
1✔
548
  }
549

550
  @Override
551
  public String getAuthority() {
552
    return authority;
1✔
553
  }
554

555
  InternalChannelz.Security getSecurityInfo() {
556
    return securityInfo;
1✔
557
  }
558

559
  @Override
560
  protected void onConnectionError(ChannelHandlerContext ctx,  boolean outbound, Throwable cause,
561
      Http2Exception http2Ex) {
562
    logger.log(Level.FINE, "Caught a connection error", cause);
1✔
563
    lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
1✔
564
    // Parent class will shut down the Channel
565
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
566
  }
1✔
567

568
  @Override
569
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
570
      Http2Exception.StreamException http2Ex) {
571
    // Close the stream with a status that contains the cause.
572
    NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId()));
1✔
573
    if (stream != null) {
1✔
574
      stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata());
1✔
575
    } else {
576
      logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause);
1✔
577
    }
578

579
    // Delegate to the base class to send a RST_STREAM.
580
    super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
581
  }
1✔
582

583
  @Override
584
  protected boolean isGracefulShutdownComplete() {
585
    // Only allow graceful shutdown to complete after all pending streams have completed.
586
    return super.isGracefulShutdownComplete()
1✔
587
        && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
1✔
588
  }
589

590
  /**
591
   * Attempts to create a new stream from the given command. If there are too many active streams,
592
   * the creation request is queued.
593
   */
594
  private void createStream(CreateStreamCommand command, ChannelPromise promise)
595
          throws Exception {
596
    if (lifecycleManager.getShutdownThrowable() != null) {
1✔
597
      command.stream().setNonExistent();
1✔
598
      // The connection is going away (it is really the GOAWAY case),
599
      // just terminate the stream now.
600
      command.stream().transportReportStatus(
1✔
601
          lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
1✔
602
      promise.setFailure(lifecycleManager.getShutdownThrowable());
1✔
603
      return;
1✔
604
    }
605

606
    CharSequence authorityHeader = command.headers().authority();
1✔
607
    if (authorityHeader == null) {
1✔
608
      Status authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
609
              "Missing authority header");
610
      command.stream().setNonExistent();
1✔
611
      command.stream().transportReportStatus(
1✔
612
              Status.UNAVAILABLE, RpcProgress.PROCESSED, true, new Metadata());
613
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
614
              authorityVerificationStatus, null));
615
      return;
1✔
616
    }
617
    // No need to verify authority for the rpc outgoing header if it is same as the authority
618
    // for the transport
619
    if (!authority.contentEquals(authorityHeader)) {
1✔
620
      Status authorityVerificationStatus = peerVerificationResults.get(
1✔
621
              authorityHeader.toString());
1✔
622
      if (authorityVerificationStatus == null) {
1✔
623
        if (attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) == null) {
1✔
624
          authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
625
                  "Authority verifier not found to verify authority");
626
          command.stream().setNonExistent();
1✔
627
          command.stream().transportReportStatus(
1✔
628
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
629
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
630
                  authorityVerificationStatus, null));
631
          return;
1✔
632
        }
633
        authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
1✔
634
                .verifyAuthority(authorityHeader.toString());
1✔
635
        peerVerificationResults.put(authorityHeader.toString(), authorityVerificationStatus);
1✔
636
        if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) {
1✔
637
          logger.log(Level.WARNING, String.format("%s.%s",
1✔
638
                          authorityVerificationStatus.getDescription(),
1✔
639
                          enablePerRpcAuthorityCheck
1✔
640
                                  ? "" : " This will be an error in the future."),
1✔
641
                  InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
642
                          authorityVerificationStatus, null));
643
        }
644
      }
645
      if (!authorityVerificationStatus.isOk()) {
1✔
646
        if (enablePerRpcAuthorityCheck) {
1✔
647
          command.stream().setNonExistent();
1✔
648
          command.stream().transportReportStatus(
1✔
649
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
650
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
651
                  authorityVerificationStatus, null));
652
          return;
1✔
653
        }
654
      }
655
    }
656
    // Get the stream ID for the new stream.
657
    int streamId;
658
    try {
659
      streamId = incrementAndGetNextStreamId();
1✔
660
    } catch (StatusException e) {
1✔
661
      command.stream().setNonExistent();
1✔
662
      // Stream IDs have been exhausted for this connection. Fail the promise immediately.
663
      promise.setFailure(e);
1✔
664

665
      // Initiate a graceful shutdown if we haven't already.
666
      if (!connection().goAwaySent()) {
1✔
667
        logger.fine("Stream IDs have been exhausted for this connection. "
1✔
668
                + "Initiating graceful shutdown of the connection.");
669
        lifecycleManager.notifyShutdown(e.getStatus());
1✔
670
        close(ctx(), ctx().newPromise());
1✔
671
      }
672
      return;
1✔
673
    }
1✔
674
    if (connection().goAwayReceived()) {
1✔
675
      Status s = abruptGoAwayStatus;
1✔
676
      int maxActiveStreams = connection().local().maxActiveStreams();
1✔
677
      int lastStreamId = connection().local().lastStreamKnownByPeer();
1✔
678
      if (s == null) {
1✔
679
        // Should be impossible, but handle pseudo-gracefully
680
        s = Status.INTERNAL.withDescription(
×
681
            "Failed due to abrupt GOAWAY, but can't find GOAWAY details");
682
      } else if (streamId > lastStreamId) {
1✔
683
        s = s.augmentDescription(
1✔
684
            "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId);
685
      } else if (connection().local().numActiveStreams() == maxActiveStreams) {
1✔
686
        s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams);
1✔
687
      }
688
      if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) {
1✔
689
        // This should only be reachable during onGoAwayReceived, as otherwise
690
        // getShutdownThrowable() != null
691
        command.stream().setNonExistent();
1✔
692
        command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
1✔
693
        promise.setFailure(s.asRuntimeException());
1✔
694
        return;
1✔
695
      }
696
    }
697

698
    NettyClientStream.TransportState stream = command.stream();
1✔
699
    Http2Headers headers = command.headers();
1✔
700
    stream.setId(streamId);
1✔
701

702
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
1✔
703
      PerfMark.linkIn(command.getLink());
1✔
704
      PerfMark.attachTag(stream.tag());
1✔
705
      createStreamTraced(
1✔
706
          streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
1✔
707
    }
708
  }
1✔
709

710
  private void createStreamTraced(
711
      final int streamId,
712
      final NettyClientStream.TransportState stream,
713
      final Http2Headers headers,
714
      boolean isGet,
715
      final boolean shouldBeCountedForInUse,
716
      final ChannelPromise promise) {
717
    // Create an intermediate promise so that we can intercept the failure reported back to the
718
    // application.
719
    ChannelPromise tempPromise = ctx().newPromise();
1✔
720
    encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
1✔
721
        .addListener(new ChannelFutureListener() {
1✔
722
          @Override
723
          public void operationComplete(ChannelFuture future) throws Exception {
724
            if (future.isSuccess()) {
1✔
725
              // The http2Stream will be null in case a stream buffered in the encoder
726
              // was canceled via RST_STREAM.
727
              Http2Stream http2Stream = connection().stream(streamId);
1✔
728
              if (http2Stream != null) {
1✔
729
                stream.getStatsTraceContext().clientOutboundHeaders();
1✔
730
                http2Stream.setProperty(streamKey, stream);
1✔
731

732
                // This delays the in-use state until the I/O completes, which technically may
733
                // be later than we would like.
734
                if (shouldBeCountedForInUse) {
1✔
735
                  inUseState.updateObjectInUse(http2Stream, true);
1✔
736
                }
737

738
                // Attach the client stream to the HTTP/2 stream object as user data.
739
                stream.setHttp2Stream(http2Stream);
1✔
740
              }
741
              // Otherwise, the stream has been cancelled and Netty is sending a
742
              // RST_STREAM frame which causes it to purge pending writes from the
743
              // flow-controller and delete the http2Stream. The stream listener has already
744
              // been notified of cancellation so there is nothing to do.
745

746
              // Just forward on the success status to the original promise.
747
              promise.setSuccess();
1✔
748
            } else {
1✔
749
              Throwable cause = future.cause();
1✔
750
              if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
1✔
751
                StreamBufferingEncoder.Http2GoAwayException e =
1✔
752
                    (StreamBufferingEncoder.Http2GoAwayException) cause;
753
                Status status = statusFromH2Error(
1✔
754
                    Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
755
                    e.errorCode(), e.debugData());
1✔
756
                cause = status.asRuntimeException();
1✔
757
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
758
              } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
1✔
759
                Status status = lifecycleManager.getShutdownStatus();
1✔
760
                if (status == null) {
1✔
761
                  status = Status.UNAVAILABLE.withCause(cause)
×
762
                      .withDescription("Connection closed while stream is buffered");
×
763
                }
764
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
765
              }
766
              promise.setFailure(cause);
1✔
767
            }
768
          }
1✔
769
        });
770
  }
1✔
771

772
  /**
773
   * Cancels this stream.
774
   */
775
  private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
776
      ChannelPromise promise) {
777
    NettyClientStream.TransportState stream = cmd.stream();
1✔
778
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
1✔
779
      PerfMark.attachTag(stream.tag());
1✔
780
      PerfMark.linkIn(cmd.getLink());
1✔
781
      Status reason = cmd.reason();
1✔
782
      if (reason != null) {
1✔
783
        stream.transportReportStatus(reason, true, new Metadata());
1✔
784
      }
785
      if (!cmd.stream().isNonExistent()) {
1✔
786
        encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
1✔
787
      } else {
788
        promise.setSuccess();
1✔
789
      }
790
    }
791
  }
1✔
792

793
  /**
794
   * Sends the given GRPC frame for the stream.
795
   */
796
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
797
      ChannelPromise promise) {
798
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
1✔
799
      PerfMark.attachTag(cmd.stream().tag());
1✔
800
      PerfMark.linkIn(cmd.getLink());
1✔
801
      // Call the base class to write the HTTP/2 DATA frame.
802
      // Note: no need to flush since this is handled by the outbound flow controller.
803
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
804
    }
805
  }
1✔
806

807
  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
808
      ChannelPromise promise) {
809
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
1✔
810
      PerfMark.linkIn(msg.getLink());
1✔
811
      sendPingFrameTraced(ctx, msg, promise);
1✔
812
    }
813
  }
1✔
814

815
  /**
816
   * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
817
   * registered to be called when the existing operation completes, and no new frame is sent.
818
   */
819
  private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
820
      ChannelPromise promise) {
821
    // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
822
    // but before termination. After termination, messages will no longer arrive because the
823
    // pipeline clears all handlers on channel close.
824

825
    PingCallback callback = msg.callback();
1✔
826
    Executor executor = msg.executor();
1✔
827
    // we only allow one outstanding ping at a time, so just add the callback to
828
    // any outstanding operation
829
    if (ping != null) {
1✔
830
      promise.setSuccess();
1✔
831
      ping.addCallback(callback, executor);
1✔
832
      return;
1✔
833
    }
834

835
    // Use a new promise to prevent calling the callback twice on write failure: here and in
836
    // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if
837
    // ping != null above.
838
    promise.setSuccess();
1✔
839
    promise = ctx().newPromise();
1✔
840
    // set outstanding operation
841
    long data = USER_PING_PAYLOAD;
1✔
842
    Stopwatch stopwatch = stopwatchFactory.get();
1✔
843
    stopwatch.start();
1✔
844
    ping = new Http2Ping(data, stopwatch);
1✔
845
    ping.addCallback(callback, executor);
1✔
846
    // and then write the ping
847
    encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise);
1✔
848
    ctx.flush();
1✔
849
    final Http2Ping finalPing = ping;
1✔
850
    promise.addListener(new ChannelFutureListener() {
1✔
851
      @Override
852
      public void operationComplete(ChannelFuture future) throws Exception {
853
        if (future.isSuccess()) {
1✔
854
          transportTracer.reportKeepAliveSent();
1✔
855
        } else {
856
          Throwable cause = future.cause();
×
857
          if (cause instanceof ClosedChannelException) {
×
858
            cause = lifecycleManager.getShutdownThrowable();
×
859
            if (cause == null) {
×
860
              cause = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
×
861
                  .withCause(future.cause()).asException();
×
862
            }
863
          }
864
          finalPing.failed(cause);
×
865
          if (ping == finalPing) {
×
866
            ping = null;
×
867
          }
868
        }
869
      }
1✔
870
    });
871
  }
1✔
872

873
  private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
874
      ChannelPromise promise) throws Exception {
875
    lifecycleManager.notifyShutdown(msg.getStatus());
1✔
876
    // Explicitly flush to create any buffered streams before sending GOAWAY.
877
    // TODO(ejona): determine if the need to flush is a bug in Netty
878
    flush(ctx);
1✔
879
    close(ctx, promise);
1✔
880
  }
1✔
881

882
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
883
      ChannelPromise promise) throws Exception {
884
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
885
      @Override
886
      public boolean visit(Http2Stream stream) throws Http2Exception {
887
        NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
888
        Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
1✔
889
        try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
1✔
890
          PerfMark.linkIn(msg.getLink());
1✔
891
          PerfMark.attachTag(tag);
1✔
892
          if (clientStream != null) {
1✔
893
            clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
1✔
894
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
895
          }
896
          stream.close();
1✔
897
          return true;
1✔
898
        }
899
      }
900
    });
901
    close(ctx, promise);
1✔
902
  }
1✔
903

904
  /**
905
   * Handler for a GOAWAY being received. Fails any streams created after the
906
   * last known stream. May only be called during a read.
907
   */
908
  private void goingAway(long errorCode, byte[] debugData) {
909
    Status finalStatus = statusFromH2Error(
1✔
910
        Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
911
    lifecycleManager.notifyGracefulShutdown(finalStatus);
1✔
912
    abruptGoAwayStatus = statusFromH2Error(
1✔
913
        Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
914
    // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
915
    // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
916
    // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
917
    // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
918
    // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
919
    // UNAVAILABLE. https://github.com/netty/netty/issues/10670
920
    final Status abruptGoAwayStatusConservative = statusFromH2Error(
1✔
921
        null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
922
    final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code();
1✔
923
    // Try to allocate as many in-flight streams as possible, to reduce race window of
924
    // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
925
    // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
926
    // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
927
    // processed and thus this processing must be in-line before processing additional reads.
928

929
    // This can cause reentrancy, but should be minor since it is normal to handle writes in
930
    // response to a read. Also, the call stack is rather shallow at this point
931
    clientWriteQueue.drainNow();
1✔
932
    if (lifecycleManager.notifyShutdown(finalStatus)) {
1✔
933
      // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
934
      // RPCs were not observed by the remote and so should be UNAVAILABLE.
935
      channelInactiveReason = statusFromH2Error(
1✔
936
          null, "Connection closed after GOAWAY", errorCode, debugData);
937
    }
938

939
    final int lastKnownStream = connection().local().lastStreamKnownByPeer();
1✔
940
    try {
941
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
942
        @Override
943
        public boolean visit(Http2Stream stream) throws Http2Exception {
944
          if (stream.id() > lastKnownStream) {
1✔
945
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
946
            if (clientStream != null) {
1✔
947
              // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
948
              // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
949
              // retries, but only if something else caused a connection failure.
950
              RpcProgress progress = mayBeHittingNettyBug
1✔
951
                  ? RpcProgress.PROCESSED
1✔
952
                  : RpcProgress.REFUSED;
1✔
953
              clientStream.transportReportStatus(
1✔
954
                  abruptGoAwayStatusConservative, progress, false, new Metadata());
955
            }
956
            stream.close();
1✔
957
          }
958
          return true;
1✔
959
        }
960
      });
961
    } catch (Http2Exception e) {
×
962
      throw new RuntimeException(e);
×
963
    }
1✔
964
  }
1✔
965

966
  private void cancelPing(Throwable t) {
967
    if (ping != null) {
1✔
968
      ping.failed(t);
1✔
969
      ping = null;
1✔
970
    }
971
  }
1✔
972

973
  /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
974
  private Status statusFromH2Error(
975
      Status.Code statusCode, String context, long errorCode, byte[] debugData) {
976
    Status status = GrpcUtil.Http2Error.statusForCode(errorCode);
1✔
977
    if (statusCode == null) {
1✔
978
      statusCode = status.getCode();
1✔
979
    }
980
    String debugString = "";
1✔
981
    if (debugData != null && debugData.length > 0) {
1✔
982
      // If a debug message was provided, use it.
983
      debugString = ", debug data: " + new String(debugData, UTF_8);
1✔
984
    }
985
    return statusCode.toStatus()
1✔
986
        .withDescription(context + ". " + status.getDescription() + debugString);
1✔
987
  }
988

989
  /**
990
   * Gets the client stream associated to the given HTTP/2 stream object.
991
   */
992
  private NettyClientStream.TransportState clientStream(Http2Stream stream) {
993
    return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey);
1✔
994
  }
995

996
  private int incrementAndGetNextStreamId() throws StatusException {
997
    int nextStreamId = connection().local().incrementAndGetNextStreamId();
1✔
998
    if (nextStreamId < 0) {
1✔
999
      logger.fine("Stream IDs have been exhausted for this connection. "
1✔
1000
              + "Initiating graceful shutdown of the connection.");
1001
      throw EXHAUSTED_STREAMS_STATUS.asException();
1✔
1002
    }
1003
    return nextStreamId;
1✔
1004
  }
1005

1006
  private Http2Stream requireHttp2Stream(int streamId) {
1007
    Http2Stream stream = connection().stream(streamId);
1✔
1008
    if (stream == null) {
1✔
1009
      // This should never happen.
1010
      throw new AssertionError("Stream does not exist: " + streamId);
×
1011
    }
1012
    return stream;
1✔
1013
  }
1014

1015
  private class FrameListener extends Http2FrameAdapter {
1✔
1016
    private boolean firstSettings = true;
1✔
1017

1018
    @Override
1019
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
1020
      if (firstSettings) {
1✔
1021
        firstSettings = false;
1✔
1022
        attributes = lifecycleManager.filterAttributes(attributes);
1✔
1023
        lifecycleManager.notifyReady();
1✔
1024
      }
1025
    }
1✔
1026

1027
    @Override
1028
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
1029
        boolean endOfStream) throws Http2Exception {
1030
      NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
1031
      return padding;
1✔
1032
    }
1033

1034
    @Override
1035
    public void onHeadersRead(ChannelHandlerContext ctx,
1036
        int streamId,
1037
        Http2Headers headers,
1038
        int streamDependency,
1039
        short weight,
1040
        boolean exclusive,
1041
        int padding,
1042
        boolean endStream) throws Http2Exception {
1043
      NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
1✔
1044
    }
1✔
1045

1046
    @Override
1047
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
1048
        throws Http2Exception {
1049
      NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
1✔
1050
    }
1✔
1051

1052
    @Override
1053
    public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception {
1054
      Http2Ping p = ping;
1✔
1055
      if (ackPayload == flowControlPing().payload()) {
1✔
1056
        flowControlPing().updateWindow();
1✔
1057
        logger.log(Level.FINE, "Window: {0}",
1✔
1058
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1059
      } else if (p != null) {
1✔
1060
        if (p.payload() == ackPayload) {
1✔
1061
          p.complete();
1✔
1062
          ping = null;
1✔
1063
        } else {
1064
          logger.log(Level.WARNING,
1✔
1065
              "Received unexpected ping ack. Expecting {0}, got {1}",
1066
              new Object[] {p.payload(), ackPayload});
1✔
1067
        }
1068
      } else {
1069
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1070
      }
1071
      if (keepAliveManager != null) {
1✔
1072
        keepAliveManager.onDataReceived();
1✔
1073
      }
1074
    }
1✔
1075

1076
    @Override
1077
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
1078
      if (keepAliveManager != null) {
1✔
1079
        keepAliveManager.onDataReceived();
×
1080
      }
1081
    }
1✔
1082
  }
1083

1084
  private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
1085
      implements AbstractNettyHandler.PingLimiter {
1086
    private int pingCount;
1087

1088
    public PingCountingFrameWriter(Http2FrameWriter delegate) {
1089
      super(delegate);
1✔
1090
    }
1✔
1091

1092
    @Override
1093
    public boolean isPingAllowed() {
1094
      // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
1095
      return pingCount < 2;
1✔
1096
    }
1097

1098
    @Override
1099
    public ChannelFuture writeHeaders(
1100
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1101
        int padding, boolean endStream, ChannelPromise promise) {
1102
      pingCount = 0;
×
1103
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
×
1104
    }
1105

1106
    @Override
1107
    public ChannelFuture writeHeaders(
1108
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1109
        int streamDependency, short weight, boolean exclusive,
1110
        int padding, boolean endStream, ChannelPromise promise) {
1111
      pingCount = 0;
1✔
1112
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
1✔
1113
          padding, endStream, promise);
1114
    }
1115

1116
    @Override
1117
    public ChannelFuture writeWindowUpdate(
1118
        ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
1119
      pingCount = 0;
1✔
1120
      return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
1✔
1121
    }
1122

1123
    @Override
1124
    public ChannelFuture writePing(
1125
        ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
1126
      if (!ack) {
1✔
1127
        pingCount++;
1✔
1128
      }
1129
      return super.writePing(ctx, ack, data, promise);
1✔
1130
    }
1131

1132
    @Override
1133
    public ChannelFuture writeData(
1134
        ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
1135
        ChannelPromise promise) {
1136
      if (data.isReadable()) {
1✔
1137
        pingCount = 0;
1✔
1138
      }
1139
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1140
    }
1141
  }
1142
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc