• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20230

31 Mar 2026 09:55AM UTC coverage: 88.734% (+0.01%) from 88.72%
#20230

push

github

web-flow
openTelemetry: add tcp metrics (#12652)

Implements [A80](https://github.com/grpc/proposal/pull/519)

35697 of 40229 relevant lines covered (88.73%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.66
/../netty/src/main/java/io/grpc/netty/NettyClientHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
20
import static io.netty.util.CharsetUtil.UTF_8;
21
import static io.netty.util.internal.ObjectUtil.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Stopwatch;
26
import com.google.common.base.Supplier;
27
import com.google.common.base.Ticker;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.InternalChannelz;
31
import io.grpc.InternalStatus;
32
import io.grpc.Metadata;
33
import io.grpc.MetricRecorder;
34
import io.grpc.Status;
35
import io.grpc.StatusException;
36
import io.grpc.internal.ClientStreamListener.RpcProgress;
37
import io.grpc.internal.ClientTransport.PingCallback;
38
import io.grpc.internal.DisconnectError;
39
import io.grpc.internal.GoAwayDisconnectError;
40
import io.grpc.internal.GrpcAttributes;
41
import io.grpc.internal.GrpcUtil;
42
import io.grpc.internal.Http2Ping;
43
import io.grpc.internal.InUseStateAggregator;
44
import io.grpc.internal.KeepAliveManager;
45
import io.grpc.internal.SimpleDisconnectError;
46
import io.grpc.internal.TransportTracer;
47
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
48
import io.netty.buffer.ByteBuf;
49
import io.netty.buffer.ByteBufUtil;
50
import io.netty.buffer.Unpooled;
51
import io.netty.channel.Channel;
52
import io.netty.channel.ChannelFuture;
53
import io.netty.channel.ChannelFutureListener;
54
import io.netty.channel.ChannelHandlerContext;
55
import io.netty.channel.ChannelPromise;
56
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
57
import io.netty.handler.codec.http2.DefaultHttp2Connection;
58
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
59
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
60
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
61
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
62
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
63
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
64
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
65
import io.netty.handler.codec.http2.Http2CodecUtil;
66
import io.netty.handler.codec.http2.Http2Connection;
67
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
68
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
69
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
70
import io.netty.handler.codec.http2.Http2Error;
71
import io.netty.handler.codec.http2.Http2Exception;
72
import io.netty.handler.codec.http2.Http2FrameAdapter;
73
import io.netty.handler.codec.http2.Http2FrameLogger;
74
import io.netty.handler.codec.http2.Http2FrameReader;
75
import io.netty.handler.codec.http2.Http2FrameWriter;
76
import io.netty.handler.codec.http2.Http2Headers;
77
import io.netty.handler.codec.http2.Http2HeadersDecoder;
78
import io.netty.handler.codec.http2.Http2HeadersEncoder;
79
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
80
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
81
import io.netty.handler.codec.http2.Http2Settings;
82
import io.netty.handler.codec.http2.Http2Stream;
83
import io.netty.handler.codec.http2.Http2StreamVisitor;
84
import io.netty.handler.codec.http2.StreamBufferingEncoder;
85
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
86
import io.netty.handler.logging.LogLevel;
87
import io.perfmark.PerfMark;
88
import io.perfmark.Tag;
89
import io.perfmark.TaskCloseable;
90
import java.nio.channels.ClosedChannelException;
91
import java.util.LinkedHashMap;
92
import java.util.Map;
93
import java.util.concurrent.Executor;
94
import java.util.logging.Level;
95
import java.util.logging.Logger;
96
import javax.annotation.Nullable;
97

98
/**
99
 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
100
 * the context of the Netty Channel thread.
101
 */
102
class NettyClientHandler extends AbstractNettyHandler {
103
  private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
1✔
104
  static boolean enablePerRpcAuthorityCheck =
1✔
105
      GrpcUtil.getFlag("GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK", false);
1✔
106

107
  /**
108
   * A message that simply passes through the channel without any real processing. It is useful to
109
   * check if buffers have been drained and test the health of the channel in a single operation.
110
   */
111
  static final Object NOOP_MESSAGE = new Object();
1✔
112

113
  /**
114
   * Status used when the transport has exhausted the number of streams.
115
   */
116
  private static final Status EXHAUSTED_STREAMS_STATUS =
1✔
117
          Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");
1✔
118
  private static final long USER_PING_PAYLOAD = 1111;
119

120
  private final Http2Connection.PropertyKey streamKey;
121
  private final ClientTransportLifecycleManager lifecycleManager;
122
  private final KeepAliveManager keepAliveManager;
123
  // Returns new unstarted stopwatches
124
  private final Supplier<Stopwatch> stopwatchFactory;
125
  private final TransportTracer transportTracer;
126
  private final Attributes eagAttributes;
127
  private final TcpMetrics tcpMetrics;
128
  private final String authority;
129
  private final InUseStateAggregator<Http2Stream> inUseState =
1✔
130
      new InUseStateAggregator<Http2Stream>() {
1✔
131
        @Override
132
        protected void handleInUse() {
133
          lifecycleManager.notifyInUse(true);
1✔
134
        }
1✔
135

136
        @Override
137
        protected void handleNotInUse() {
138
          lifecycleManager.notifyInUse(false);
1✔
139
        }
1✔
140
      };
141
  private final Map<String, Status> peerVerificationResults =
1✔
142
      new LinkedHashMap<String, Status>() {
1✔
143
        @Override
144
        protected boolean removeEldestEntry(Map.Entry<String, Status> eldest) {
145
          return size() > 100;
1✔
146
        }
147
      };
148

149
  private WriteQueue clientWriteQueue;
150
  private Http2Ping ping;
151
  private Attributes attributes;
152
  private InternalChannelz.Security securityInfo;
153
  private Status abruptGoAwayStatus;
154
  private Status channelInactiveReason;
155

156
  static NettyClientHandler newHandler(
157
      ClientTransportLifecycleManager lifecycleManager,
158
      @Nullable KeepAliveManager keepAliveManager,
159
      boolean autoFlowControl,
160
      int flowControlWindow,
161
      int maxHeaderListSize,
162
      int softLimitHeaderListSize,
163
      Supplier<Stopwatch> stopwatchFactory,
164
      Runnable tooManyPingsRunnable,
165
      TransportTracer transportTracer,
166
      Attributes eagAttributes,
167
      String authority,
168
      ChannelLogger negotiationLogger,
169
      Ticker ticker,
170
      MetricRecorder metricRecorder) {
171
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
172
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
1✔
173
    Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
1✔
174
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
175
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
176
    Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(encoder);
1✔
177
    Http2Connection connection = new DefaultHttp2Connection(false);
1✔
178
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
179
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
180
    DefaultHttp2RemoteFlowController controller =
1✔
181
        new DefaultHttp2RemoteFlowController(connection, dist);
182
    connection.remote().flowController(controller);
1✔
183

184
    return newHandler(
1✔
185
        connection,
186
        frameReader,
187
        frameWriter,
188
        lifecycleManager,
189
        keepAliveManager,
190
        autoFlowControl,
191
        flowControlWindow,
192
        maxHeaderListSize,
193
        softLimitHeaderListSize,
194
        stopwatchFactory,
195
        tooManyPingsRunnable,
196
        transportTracer,
197
        eagAttributes,
198
        authority,
199
        negotiationLogger,
200
        ticker,
201
        metricRecorder);
202
  }
203

204
  @VisibleForTesting
205
  static NettyClientHandler newHandler(
206
      final Http2Connection connection,
207
      Http2FrameReader frameReader,
208
      Http2FrameWriter frameWriter,
209
      ClientTransportLifecycleManager lifecycleManager,
210
      KeepAliveManager keepAliveManager,
211
      boolean autoFlowControl,
212
      int flowControlWindow,
213
      int maxHeaderListSize,
214
      int softLimitHeaderListSize,
215
      Supplier<Stopwatch> stopwatchFactory,
216
      Runnable tooManyPingsRunnable,
217
      TransportTracer transportTracer,
218
      Attributes eagAttributes,
219
      String authority,
220
      ChannelLogger negotiationLogger,
221
      Ticker ticker,
222
      MetricRecorder metricRecorder) {
223
    Preconditions.checkNotNull(connection, "connection");
1✔
224
    Preconditions.checkNotNull(frameReader, "frameReader");
1✔
225
    Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
1✔
226
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
1✔
227
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
228
    Preconditions.checkArgument(softLimitHeaderListSize > 0,
1✔
229
        "softLimitHeaderListSize must be positive");
230
    Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
231
    Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
232
    Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
233
    Preconditions.checkNotNull(authority, "authority");
1✔
234

235
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
1✔
236
    frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
1✔
237
    frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
1✔
238

239
    PingCountingFrameWriter pingCounter;
240
    frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
1✔
241

242
    StreamBufferingEncoder encoder =
1✔
243
        new StreamBufferingEncoder(
244
            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
245

246
    // Create the local flow controller configured to auto-refill the connection window.
247
    connection.local().flowController(
1✔
248
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
249

250
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
251
        frameReader);
252

253
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection));
1✔
254

255
    Http2Settings settings = new Http2Settings();
1✔
256
    settings.pushEnabled(false);
1✔
257
    settings.initialWindowSize(flowControlWindow);
1✔
258
    settings.maxConcurrentStreams(0);
1✔
259
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
260

261
    return new NettyClientHandler(
1✔
262
        decoder,
263
        encoder,
264
        settings,
265
        negotiationLogger,
266
        lifecycleManager,
267
        keepAliveManager,
268
        stopwatchFactory,
269
        tooManyPingsRunnable,
270
        transportTracer,
271
        eagAttributes,
272
        authority,
273
        autoFlowControl,
274
        pingCounter,
275
        ticker,
276
        maxHeaderListSize,
277
        softLimitHeaderListSize,
278
        metricRecorder);
279
  }
280

281
  private NettyClientHandler(
282
      Http2ConnectionDecoder decoder,
283
      Http2ConnectionEncoder encoder,
284
      Http2Settings settings,
285
      ChannelLogger negotiationLogger,
286
      ClientTransportLifecycleManager lifecycleManager,
287
      KeepAliveManager keepAliveManager,
288
      Supplier<Stopwatch> stopwatchFactory,
289
      final Runnable tooManyPingsRunnable,
290
      TransportTracer transportTracer,
291
      Attributes eagAttributes,
292
      String authority,
293
      boolean autoFlowControl,
294
      PingLimiter pingLimiter,
295
      Ticker ticker,
296
      int maxHeaderListSize,
297
      int softLimitHeaderListSize,
298
      MetricRecorder metricRecorder) {
299
    super(
1✔
300
        /* channelUnused= */ null,
301
        decoder,
302
        encoder,
303
        settings,
304
        negotiationLogger,
305
        autoFlowControl,
306
        pingLimiter,
307
        ticker,
308
        maxHeaderListSize,
309
        softLimitHeaderListSize);
310
    this.lifecycleManager = lifecycleManager;
1✔
311
    this.keepAliveManager = keepAliveManager;
1✔
312
    this.stopwatchFactory = stopwatchFactory;
1✔
313
    this.transportTracer = Preconditions.checkNotNull(transportTracer);
1✔
314
    this.eagAttributes = eagAttributes;
1✔
315
    this.authority = authority;
1✔
316
    this.attributes = Attributes.newBuilder()
1✔
317
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
1✔
318
    this.tcpMetrics = new TcpMetrics(metricRecorder);
1✔
319

320
    // Set the frame listener on the decoder.
321
    decoder().frameListener(new FrameListener());
1✔
322

323
    Http2Connection connection = encoder.connection();
1✔
324
    streamKey = connection.newKey();
1✔
325

326
    connection.addListener(new Http2ConnectionAdapter() {
1✔
327
      @Override
328
      public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
329
        byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
1✔
330
        goingAway(errorCode, debugDataBytes);
1✔
331
        if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
1✔
332
          String data = new String(debugDataBytes, UTF_8);
1✔
333
          logger.log(
1✔
334
              Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data);
335
          if ("too_many_pings".equals(data)) {
1✔
336
            tooManyPingsRunnable.run();
1✔
337
          }
338
        }
339
      }
1✔
340

341
      @Override
342
      public void onStreamActive(Http2Stream stream) {
343
        if (connection().numActiveStreams() == 1
1✔
344
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
345
          NettyClientHandler.this.keepAliveManager.onTransportActive();
1✔
346
        }
347
      }
1✔
348

349
      @Override
350
      public void onStreamClosed(Http2Stream stream) {
351
        // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
352
        // the first place, we don't propagate that option here, and it's safe to reset the in-use
353
        // state for them, which will be a cheap no-op.
354
        inUseState.updateObjectInUse(stream, false);
1✔
355
        if (connection().numActiveStreams() == 0
1✔
356
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
357
          NettyClientHandler.this.keepAliveManager.onTransportIdle();
1✔
358
        }
359
      }
1✔
360
    });
361
  }
1✔
362

363
  /**
364
   * The protocol negotiation attributes, available once the protocol negotiation completes;
365
   * otherwise returns {@code Attributes.EMPTY}.
366
   */
367
  Attributes getAttributes() {
368
    return attributes;
1✔
369
  }
370

371
  /**
372
   * Handler for commands sent from the stream.
373
   */
374
  @Override
375
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
376
          throws Exception {
377
    if (msg instanceof CreateStreamCommand) {
1✔
378
      createStream((CreateStreamCommand) msg, promise);
1✔
379
    } else if (msg instanceof SendGrpcFrameCommand) {
1✔
380
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
381
    } else if (msg instanceof CancelClientStreamCommand) {
1✔
382
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
1✔
383
    } else if (msg instanceof SendPingCommand) {
1✔
384
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
1✔
385
    } else if (msg instanceof GracefulCloseCommand) {
1✔
386
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
1✔
387
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
388
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
389
    } else if (msg == NOOP_MESSAGE) {
1✔
390
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
1✔
391
    } else {
392
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
393
    }
394
  }
1✔
395

396
  void startWriteQueue(Channel channel) {
397
    clientWriteQueue = new WriteQueue(channel);
1✔
398
  }
1✔
399

400
  WriteQueue getWriteQueue() {
401
    return clientWriteQueue;
1✔
402
  }
403

404
  ClientTransportLifecycleManager getLifecycleManager() {
405
    return lifecycleManager;
1✔
406
  }
407

408
  /**
409
   * Returns the given processed bytes back to inbound flow control.
410
   */
411
  void returnProcessedBytes(Http2Stream stream, int bytes) {
412
    try {
413
      decoder().flowController().consumeBytes(stream, bytes);
1✔
414
    } catch (Http2Exception e) {
×
415
      throw new RuntimeException(e);
×
416
    }
1✔
417
  }
1✔
418

419
  private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
420
    // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
421
    if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
1✔
422
      NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
423
      PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
1✔
424
      // check metadata size vs soft limit
425
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
426
      boolean shouldFail =
1✔
427
          Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
428
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize);
429
      if (shouldFail && endStream) {
1✔
430
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
×
431
            .withDescription(
×
432
                String.format(
×
433
                    "Server Status + Trailers of size %d exceeded Metadata size soft limit: %d",
434
                    h2HeadersSize,
×
435
                    softLimitHeaderListSize)), true, new Metadata());
×
436
        return;
×
437
      } else if (shouldFail) {
1✔
438
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
1✔
439
            .withDescription(
1✔
440
                String.format(
1✔
441
                    "Server Headers of size %d exceeded Metadata size soft limit: %d",
442
                    h2HeadersSize,
1✔
443
                    softLimitHeaderListSize)), true, new Metadata());
1✔
444
        return;
1✔
445
      }
446
      stream.transportHeadersReceived(headers, endStream);
1✔
447
    }
448

449
    if (keepAliveManager != null) {
1✔
450
      keepAliveManager.onDataReceived();
1✔
451
    }
452
  }
1✔
453

454
  /**
455
   * Handler for an inbound HTTP/2 DATA frame.
456
   */
457
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
458
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
459
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
460
    PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
1✔
461
    stream.transportDataReceived(data, endOfStream);
1✔
462
    if (keepAliveManager != null) {
1✔
463
      keepAliveManager.onDataReceived();
1✔
464
    }
465
  }
1✔
466

467
  /**
468
   * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
469
   */
470
  private void onRstStreamRead(int streamId, long errorCode) {
471
    NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
1✔
472
    if (stream != null) {
1✔
473
      PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
1✔
474
      Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
1✔
475
      stream.transportReportStatus(
1✔
476
          status,
477
          errorCode == Http2Error.REFUSED_STREAM.code()
1✔
478
              ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
479
          false /*stop delivery*/,
480
          new Metadata());
481
      if (keepAliveManager != null) {
1✔
482
        keepAliveManager.onDataReceived();
1✔
483
      }
484
    }
485
  }
1✔
486

487
  @Override
488
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
489
    tcpMetrics.recordTcpInfo(ctx.channel());
1✔
490
    logger.fine("Network channel being closed by the application.");
1✔
491
    if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
1✔
492
      lifecycleManager.notifyShutdown(
1✔
493
          Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"),
1✔
494
          SimpleDisconnectError.UNKNOWN);
495
    }
496
    super.close(ctx, promise);
1✔
497
  }
1✔
498

499
  /**
500
   * Handler for the Channel shutting down.
501
   */
502
  @Override
503
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
504
    tcpMetrics.channelActive(ctx.channel());
1✔
505
    super.channelActive(ctx);
1✔
506
  }
1✔
507

508
  @Override
509
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
510
    try {
511
      logger.fine("Network channel is closed");
1✔
512
      tcpMetrics.channelInactive(ctx.channel());
1✔
513
      Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
1✔
514
      lifecycleManager.notifyShutdown(status, SimpleDisconnectError.UNKNOWN);
1✔
515
      final Status streamStatus;
516
      if (channelInactiveReason != null) {
1✔
517
        streamStatus = channelInactiveReason;
1✔
518
      } else {
519
        streamStatus = lifecycleManager.getShutdownStatus();
1✔
520
      }
521
      try {
522
        cancelPing(lifecycleManager.getShutdownStatus());
1✔
523
        // Report status to the application layer for any open streams
524
        connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
525
          @Override
526
          public boolean visit(Http2Stream stream) throws Http2Exception {
527
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
528
            if (clientStream != null) {
1✔
529
              clientStream.transportReportStatus(streamStatus, false, new Metadata());
1✔
530
            }
531
            return true;
1✔
532
          }
533
        });
534
      } finally {
535
        lifecycleManager.notifyTerminated(status, SimpleDisconnectError.UNKNOWN);
1✔
536
      }
537
    } finally {
538
      // Close any open streams
539
      super.channelInactive(ctx);
1✔
540
      if (keepAliveManager != null) {
1✔
541
        keepAliveManager.onTransportTermination();
1✔
542
      }
543
    }
544
  }
1✔
545

546
  @Override
547
  public void handleProtocolNegotiationCompleted(
548
      Attributes attributes, InternalChannelz.Security securityInfo) {
549
    this.attributes = this.attributes.toBuilder().setAll(attributes).build();
1✔
550
    this.securityInfo = securityInfo;
1✔
551
    super.handleProtocolNegotiationCompleted(attributes, securityInfo);
1✔
552
    writeBufferingAndRemove(ctx().channel());
1✔
553
  }
1✔
554

555
  static void writeBufferingAndRemove(Channel channel) {
556
    checkNotNull(channel, "channel");
1✔
557
    ChannelHandlerContext handlerCtx =
1✔
558
        channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
1✔
559
    if (handlerCtx == null) {
1✔
560
      return;
1✔
561
    }
562
    ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
1✔
563
  }
1✔
564

565
  @Override
566
  public Attributes getEagAttributes() {
567
    return eagAttributes;
1✔
568
  }
569

570
  @Override
571
  public String getAuthority() {
572
    return authority;
1✔
573
  }
574

575
  InternalChannelz.Security getSecurityInfo() {
576
    return securityInfo;
1✔
577
  }
578

579
  @Override
580
  protected void onConnectionError(ChannelHandlerContext ctx,  boolean outbound, Throwable cause,
581
      Http2Exception http2Ex) {
582
    logger.log(Level.FINE, "Caught a connection error", cause);
1✔
583
    lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause),
1✔
584
        SimpleDisconnectError.SOCKET_ERROR);
585
    // Parent class will shut down the Channel
586
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
587
  }
1✔
588

589
  @Override
590
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
591
      Http2Exception.StreamException http2Ex) {
592
    // Close the stream with a status that contains the cause.
593
    NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId()));
1✔
594
    if (stream != null) {
1✔
595
      stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata());
1✔
596
    } else {
597
      logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause);
1✔
598
    }
599

600
    // Delegate to the base class to send a RST_STREAM.
601
    super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
602
  }
1✔
603

604
  @Override
605
  protected boolean isGracefulShutdownComplete() {
606
    // Only allow graceful shutdown to complete after all pending streams have completed.
607
    return super.isGracefulShutdownComplete()
1✔
608
        && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
1✔
609
  }
610

611
  /**
612
   * Attempts to create a new stream from the given command. If there are too many active streams,
613
   * the creation request is queued.
614
   */
615
  private void createStream(CreateStreamCommand command, ChannelPromise promise)
616
          throws Exception {
617
    if (lifecycleManager.getShutdownStatus() != null) {
1✔
618
      command.stream().setNonExistent();
1✔
619
      // The connection is going away (it is really the GOAWAY case),
620
      // just terminate the stream now.
621
      command.stream().transportReportStatus(
1✔
622
          lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
1✔
623
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
624
              lifecycleManager.getShutdownStatus(), null));
1✔
625
      return;
1✔
626
    }
627

628
    CharSequence authorityHeader = command.headers().authority();
1✔
629
    if (authorityHeader == null) {
1✔
630
      Status authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
631
              "Missing authority header");
632
      command.stream().setNonExistent();
1✔
633
      command.stream().transportReportStatus(
1✔
634
              Status.UNAVAILABLE, RpcProgress.PROCESSED, true, new Metadata());
635
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
636
              authorityVerificationStatus, null));
637
      return;
1✔
638
    }
639
    // No need to verify authority for the rpc outgoing header if it is same as the authority
640
    // for the transport
641
    if (!authority.contentEquals(authorityHeader)) {
1✔
642
      Status authorityVerificationStatus = peerVerificationResults.get(
1✔
643
              authorityHeader.toString());
1✔
644
      if (authorityVerificationStatus == null) {
1✔
645
        if (attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) == null) {
1✔
646
          authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
647
                  "Authority verifier not found to verify authority");
648
          command.stream().setNonExistent();
1✔
649
          command.stream().transportReportStatus(
1✔
650
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
651
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
652
                  authorityVerificationStatus, null));
653
          return;
1✔
654
        }
655
        authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
1✔
656
                .verifyAuthority(authorityHeader.toString());
1✔
657
        peerVerificationResults.put(authorityHeader.toString(), authorityVerificationStatus);
1✔
658
        if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) {
1✔
659
          logger.log(Level.WARNING, String.format("%s.%s",
1✔
660
                          authorityVerificationStatus.getDescription(),
1✔
661
                          enablePerRpcAuthorityCheck
1✔
662
                                  ? "" : " This will be an error in the future."),
1✔
663
                  InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
664
                          authorityVerificationStatus, null));
665
        }
666
      }
667
      if (!authorityVerificationStatus.isOk()) {
1✔
668
        if (enablePerRpcAuthorityCheck) {
1✔
669
          command.stream().setNonExistent();
1✔
670
          command.stream().transportReportStatus(
1✔
671
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
672
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
673
                  authorityVerificationStatus, null));
674
          return;
1✔
675
        }
676
      }
677
    }
678
    // Get the stream ID for the new stream.
679
    int streamId;
680
    try {
681
      streamId = incrementAndGetNextStreamId();
1✔
682
    } catch (StatusException e) {
1✔
683
      command.stream().setNonExistent();
1✔
684
      // Stream IDs have been exhausted for this connection. Fail the promise immediately.
685
      promise.setFailure(e);
1✔
686

687
      // Initiate a graceful shutdown if we haven't already.
688
      if (!connection().goAwaySent()) {
1✔
689
        logger.fine("Stream IDs have been exhausted for this connection. "
1✔
690
                + "Initiating graceful shutdown of the connection.");
691
        lifecycleManager.notifyShutdown(e.getStatus(), SimpleDisconnectError.UNKNOWN);
1✔
692
        close(ctx(), ctx().newPromise());
1✔
693
      }
694
      return;
1✔
695
    }
1✔
696
    if (connection().goAwayReceived()) {
1✔
697
      Status s = abruptGoAwayStatus;
1✔
698
      int maxActiveStreams = connection().local().maxActiveStreams();
1✔
699
      int lastStreamId = connection().local().lastStreamKnownByPeer();
1✔
700
      if (s == null) {
1✔
701
        // Should be impossible, but handle pseudo-gracefully
702
        s = Status.INTERNAL.withDescription(
×
703
            "Failed due to abrupt GOAWAY, but can't find GOAWAY details");
704
      } else if (streamId > lastStreamId) {
1✔
705
        s = s.augmentDescription(
1✔
706
            "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId);
707
      } else if (connection().local().numActiveStreams() == maxActiveStreams) {
1✔
708
        s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams);
1✔
709
      }
710
      if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) {
1✔
711
        // This should only be reachable during onGoAwayReceived, as otherwise
712
        // getShutdownThrowable() != null
713
        command.stream().setNonExistent();
1✔
714
        command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
1✔
715
        promise.setFailure(s.asRuntimeException());
1✔
716
        return;
1✔
717
      }
718
    }
719

720
    NettyClientStream.TransportState stream = command.stream();
1✔
721
    Http2Headers headers = command.headers();
1✔
722
    stream.setId(streamId);
1✔
723

724
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
1✔
725
      PerfMark.linkIn(command.getLink());
1✔
726
      PerfMark.attachTag(stream.tag());
1✔
727
      createStreamTraced(
1✔
728
          streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
1✔
729
    }
730
  }
1✔
731

732
  private void createStreamTraced(
733
      final int streamId,
734
      final NettyClientStream.TransportState stream,
735
      final Http2Headers headers,
736
      boolean isGet,
737
      final boolean shouldBeCountedForInUse,
738
      final ChannelPromise promise) {
739
    // Create an intermediate promise so that we can intercept the failure reported back to the
740
    // application.
741
    ChannelPromise tempPromise = ctx().newPromise();
1✔
742
    encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
1✔
743
        .addListener(new ChannelFutureListener() {
1✔
744
          @Override
745
          public void operationComplete(ChannelFuture future) throws Exception {
746
            if (future.isSuccess()) {
1✔
747
              // The http2Stream will be null in case a stream buffered in the encoder
748
              // was canceled via RST_STREAM.
749
              Http2Stream http2Stream = connection().stream(streamId);
1✔
750
              if (http2Stream != null) {
1✔
751
                stream.getStatsTraceContext().clientOutboundHeaders();
1✔
752
                http2Stream.setProperty(streamKey, stream);
1✔
753

754
                // This delays the in-use state until the I/O completes, which technically may
755
                // be later than we would like.
756
                if (shouldBeCountedForInUse) {
1✔
757
                  inUseState.updateObjectInUse(http2Stream, true);
1✔
758
                }
759

760
                // Attach the client stream to the HTTP/2 stream object as user data.
761
                stream.setHttp2Stream(http2Stream);
1✔
762
                promise.setSuccess();
1✔
763
              } else {
764
                // Otherwise, the stream has been cancelled and Netty is sending a
765
                // RST_STREAM frame which causes it to purge pending writes from the
766
                // flow-controller and delete the http2Stream. The stream listener has already
767
                // been notified of cancellation so there is nothing to do.
768
                //
769
                // This process has been observed to fail in some circumstances, leaving listeners
770
                // unanswered. Ensure that some exception has been delivered consistent with the
771
                // implied RST_STREAM result above.
772
                Status status = Status.INTERNAL.withDescription("unknown stream for connection");
1✔
773
                promise.setFailure(status.asRuntimeException());
1✔
774
              }
775
            } else {
1✔
776
              Throwable cause = future.cause();
1✔
777
              if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
1✔
778
                StreamBufferingEncoder.Http2GoAwayException e =
1✔
779
                    (StreamBufferingEncoder.Http2GoAwayException) cause;
780
                Status status = statusFromH2Error(
1✔
781
                    Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
782
                    e.errorCode(), e.debugData());
1✔
783
                cause = status.asRuntimeException();
1✔
784
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
785
              } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
1✔
786
                Status status = lifecycleManager.getShutdownStatus();
1✔
787
                if (status == null) {
1✔
788
                  status = Status.UNAVAILABLE.withCause(cause)
×
789
                      .withDescription("Connection closed while stream is buffered");
×
790
                }
791
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
792
              }
793
              promise.setFailure(cause);
1✔
794
            }
795
          }
1✔
796
        });
797
    // When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in
798
    // StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS
799
    // are delayed because the OS may have too much buffered and isn't accepting the write. The
800
    // write promise is also delayed until flush(). However, we need to associate the netty stream
801
    // with the transport state so that goingAway() and forcefulClose() and able to notify the
802
    // stream of failures.
803
    //
804
    // This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but
805
    // it is better than nothing.
806
    Http2Stream http2Stream = connection().stream(streamId);
1✔
807
    if (http2Stream != null) {
1✔
808
      http2Stream.setProperty(streamKey, stream);
1✔
809
    }
810
  }
1✔
811

812
  /**
813
   * Cancels this stream.
814
   */
815
  private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
816
      ChannelPromise promise) {
817
    NettyClientStream.TransportState stream = cmd.stream();
1✔
818
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
1✔
819
      PerfMark.attachTag(stream.tag());
1✔
820
      PerfMark.linkIn(cmd.getLink());
1✔
821
      Status reason = cmd.reason();
1✔
822
      if (reason != null) {
1✔
823
        stream.transportReportStatus(reason, true, new Metadata());
1✔
824
      }
825
      if (!cmd.stream().isNonExistent()) {
1✔
826
        encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
1✔
827
      } else {
828
        promise.setSuccess();
1✔
829
      }
830
    }
831
  }
1✔
832

833
  /**
834
   * Sends the given GRPC frame for the stream.
835
   */
836
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
837
      ChannelPromise promise) {
838
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
1✔
839
      PerfMark.attachTag(cmd.stream().tag());
1✔
840
      PerfMark.linkIn(cmd.getLink());
1✔
841
      // Call the base class to write the HTTP/2 DATA frame.
842
      // Note: no need to flush since this is handled by the outbound flow controller.
843
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
844
    }
845
  }
1✔
846

847
  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
848
      ChannelPromise promise) {
849
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
1✔
850
      PerfMark.linkIn(msg.getLink());
1✔
851
      sendPingFrameTraced(ctx, msg, promise);
1✔
852
    }
853
  }
1✔
854

855
  /**
856
   * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
857
   * registered to be called when the existing operation completes, and no new frame is sent.
858
   */
859
  private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
860
      ChannelPromise promise) {
861
    // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
862
    // but before termination. After termination, messages will no longer arrive because the
863
    // pipeline clears all handlers on channel close.
864

865
    PingCallback callback = msg.callback();
1✔
866
    Executor executor = msg.executor();
1✔
867
    // we only allow one outstanding ping at a time, so just add the callback to
868
    // any outstanding operation
869
    if (ping != null) {
1✔
870
      promise.setSuccess();
1✔
871
      ping.addCallback(callback, executor);
1✔
872
      return;
1✔
873
    }
874

875
    // Use a new promise to prevent calling the callback twice on write failure: here and in
876
    // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if
877
    // ping != null above.
878
    promise.setSuccess();
1✔
879
    promise = ctx().newPromise();
1✔
880
    // set outstanding operation
881
    long data = USER_PING_PAYLOAD;
1✔
882
    Stopwatch stopwatch = stopwatchFactory.get();
1✔
883
    stopwatch.start();
1✔
884
    ping = new Http2Ping(data, stopwatch);
1✔
885
    ping.addCallback(callback, executor);
1✔
886
    // and then write the ping
887
    encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise);
1✔
888
    ctx.flush();
1✔
889
    final Http2Ping finalPing = ping;
1✔
890
    promise.addListener(new ChannelFutureListener() {
1✔
891
      @Override
892
      public void operationComplete(ChannelFuture future) throws Exception {
893
        if (future.isSuccess()) {
1✔
894
          transportTracer.reportKeepAliveSent();
1✔
895
          return;
1✔
896
        }
897
        Throwable cause = future.cause();
×
898
        Status status = lifecycleManager.getShutdownStatus();
×
899
        if (cause instanceof ClosedChannelException) {
×
900
          if (status == null) {
×
901
            status = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
×
902
                    .withCause(future.cause());
×
903
          }
904
        } else {
905
          status = Utils.statusFromThrowable(cause);
×
906
        }
907
        finalPing.failed(status);
×
908
        if (ping == finalPing) {
×
909
          ping = null;
×
910
        }
911
      }
×
912
    });
913
  }
1✔
914

915
  private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
916
      ChannelPromise promise) throws Exception {
917
    lifecycleManager.notifyShutdown(msg.getStatus(), SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
1✔
918
    // Explicitly flush to create any buffered streams before sending GOAWAY.
919
    // TODO(ejona): determine if the need to flush is a bug in Netty
920
    flush(ctx);
1✔
921
    close(ctx, promise);
1✔
922
  }
1✔
923

924
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
925
      ChannelPromise promise) throws Exception {
926
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
927
      @Override
928
      public boolean visit(Http2Stream stream) throws Http2Exception {
929
        NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
930
        Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
1✔
931
        try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
1✔
932
          PerfMark.linkIn(msg.getLink());
1✔
933
          PerfMark.attachTag(tag);
1✔
934
          if (clientStream != null) {
1✔
935
            clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
1✔
936
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
937
          }
938
          stream.close();
1✔
939
          return true;
1✔
940
        }
941
      }
942
    });
943
    close(ctx, promise);
1✔
944
  }
1✔
945

946
  /**
947
   * Handler for a GOAWAY being received. Fails any streams created after the
948
   * last known stream. May only be called during a read.
949
   */
950
  private void goingAway(long errorCode, byte[] debugData) {
951
    Status finalStatus = statusFromH2Error(
1✔
952
        Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
953
    DisconnectError disconnectError = new GoAwayDisconnectError(
1✔
954
        GrpcUtil.Http2Error.forCode(errorCode));
1✔
955
    lifecycleManager.notifyGracefulShutdown(finalStatus, disconnectError);
1✔
956
    abruptGoAwayStatus = statusFromH2Error(
1✔
957
        Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
958
    // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
959
    // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
960
    // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
961
    // since the main time servers should use abrupt GOAWAYs if there is a protocol error, and if
962
    // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
963
    // UNAVAILABLE. https://github.com/netty/netty/issues/10670
964
    final Status abruptGoAwayStatusConservative = statusFromH2Error(
1✔
965
        null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
966
    final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code();
1✔
967
    // Try to allocate as many in-flight streams as possible, to reduce race window of
968
    // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
969
    // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
970
    // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
971
    // processed and thus this processing must be in-line before processing additional reads.
972

973
    // This can cause reentrancy, but should be minor since it is normal to handle writes in
974
    // response to a read. Also, the call stack is rather shallow at this point
975
    clientWriteQueue.drainNow();
1✔
976
    if (lifecycleManager.notifyShutdown(finalStatus, disconnectError)) {
1✔
977
      // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
978
      // RPCs were not observed by the remote and so should be UNAVAILABLE.
979
      channelInactiveReason = statusFromH2Error(
1✔
980
          null, "Connection closed after GOAWAY", errorCode, debugData);
981
    }
982

983
    final int lastKnownStream = connection().local().lastStreamKnownByPeer();
1✔
984
    try {
985
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
986
        @Override
987
        public boolean visit(Http2Stream stream) throws Http2Exception {
988
          if (stream.id() > lastKnownStream) {
1✔
989
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
990
            if (clientStream != null) {
1✔
991
              // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
992
              // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
993
              // retries, but only if something else caused a connection failure.
994
              RpcProgress progress = mayBeHittingNettyBug
1✔
995
                  ? RpcProgress.PROCESSED
1✔
996
                  : RpcProgress.REFUSED;
1✔
997
              clientStream.transportReportStatus(
1✔
998
                  abruptGoAwayStatusConservative, progress, false, new Metadata());
999
            }
1000
            stream.close();
1✔
1001
          }
1002
          return true;
1✔
1003
        }
1004
      });
1005
    } catch (Http2Exception e) {
×
1006
      throw new RuntimeException(e);
×
1007
    }
1✔
1008
  }
1✔
1009

1010
  private void cancelPing(Status s) {
1011
    if (ping != null) {
1✔
1012
      ping.failed(s);
1✔
1013
      ping = null;
1✔
1014
    }
1015
  }
1✔
1016

1017
  /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
1018
  private Status statusFromH2Error(
1019
      Status.Code statusCode, String context, long errorCode, byte[] debugData) {
1020
    Status status = GrpcUtil.Http2Error.statusForCode(errorCode);
1✔
1021
    if (statusCode == null) {
1✔
1022
      statusCode = status.getCode();
1✔
1023
    }
1024
    String debugString = "";
1✔
1025
    if (debugData != null && debugData.length > 0) {
1✔
1026
      // If a debug message was provided, use it.
1027
      debugString = ", debug data: " + new String(debugData, UTF_8);
1✔
1028
    }
1029
    return statusCode.toStatus()
1✔
1030
        .withDescription(context + ". " + status.getDescription() + debugString);
1✔
1031
  }
1032

1033
  /**
1034
   * Gets the client stream associated to the given HTTP/2 stream object.
1035
   */
1036
  private NettyClientStream.TransportState clientStream(Http2Stream stream) {
1037
    return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey);
1✔
1038
  }
1039

1040
  private int incrementAndGetNextStreamId() throws StatusException {
1041
    int nextStreamId = connection().local().incrementAndGetNextStreamId();
1✔
1042
    if (nextStreamId < 0) {
1✔
1043
      logger.fine("Stream IDs have been exhausted for this connection. "
1✔
1044
              + "Initiating graceful shutdown of the connection.");
1045
      throw EXHAUSTED_STREAMS_STATUS.asException();
1✔
1046
    }
1047
    return nextStreamId;
1✔
1048
  }
1049

1050
  private Http2Stream requireHttp2Stream(int streamId) {
1051
    Http2Stream stream = connection().stream(streamId);
1✔
1052
    if (stream == null) {
1✔
1053
      // This should never happen.
1054
      throw new AssertionError("Stream does not exist: " + streamId);
×
1055
    }
1056
    return stream;
1✔
1057
  }
1058

1059
  private class FrameListener extends Http2FrameAdapter {
1✔
1060
    private boolean firstSettings = true;
1✔
1061

1062
    @Override
1063
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
1064
      if (firstSettings) {
1✔
1065
        firstSettings = false;
1✔
1066
        attributes = lifecycleManager.filterAttributes(attributes);
1✔
1067
        lifecycleManager.notifyReady();
1✔
1068
      }
1069
    }
1✔
1070

1071
    @Override
1072
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
1073
        boolean endOfStream) throws Http2Exception {
1074
      NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
1075
      return padding;
1✔
1076
    }
1077

1078
    @Override
1079
    public void onHeadersRead(ChannelHandlerContext ctx,
1080
        int streamId,
1081
        Http2Headers headers,
1082
        int streamDependency,
1083
        short weight,
1084
        boolean exclusive,
1085
        int padding,
1086
        boolean endStream) throws Http2Exception {
1087
      NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
1✔
1088
    }
1✔
1089

1090
    @Override
1091
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
1092
        throws Http2Exception {
1093
      NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
1✔
1094
    }
1✔
1095

1096
    @Override
1097
    public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception {
1098
      Http2Ping p = ping;
1✔
1099
      if (ackPayload == flowControlPing().payload()) {
1✔
1100
        flowControlPing().updateWindow();
1✔
1101
        logger.log(Level.FINE, "Window: {0}",
1✔
1102
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1103
      } else if (p != null) {
1✔
1104
        if (p.payload() == ackPayload) {
1✔
1105
          p.complete();
1✔
1106
          ping = null;
1✔
1107
        } else {
1108
          logger.log(Level.WARNING,
1✔
1109
              "Received unexpected ping ack. Expecting {0}, got {1}",
1110
              new Object[] {p.payload(), ackPayload});
1✔
1111
        }
1112
      } else {
1113
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1114
      }
1115
      if (keepAliveManager != null) {
1✔
1116
        keepAliveManager.onDataReceived();
1✔
1117
      }
1118
    }
1✔
1119

1120
    @Override
1121
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
1122
      if (keepAliveManager != null) {
1✔
1123
        keepAliveManager.onDataReceived();
×
1124
      }
1125
    }
1✔
1126
  }
1127

1128
  private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
1129
      implements AbstractNettyHandler.PingLimiter {
1130
    private int pingCount;
1131

1132
    public PingCountingFrameWriter(Http2FrameWriter delegate) {
1133
      super(delegate);
1✔
1134
    }
1✔
1135

1136
    @Override
1137
    public boolean isPingAllowed() {
1138
      // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
1139
      return pingCount < 2;
1✔
1140
    }
1141

1142
    @Override
1143
    public ChannelFuture writeHeaders(
1144
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1145
        int padding, boolean endStream, ChannelPromise promise) {
1146
      pingCount = 0;
1✔
1147
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1148
    }
1149

1150
    @Override
1151
    public ChannelFuture writeHeaders(
1152
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1153
        int streamDependency, short weight, boolean exclusive,
1154
        int padding, boolean endStream, ChannelPromise promise) {
1155
      pingCount = 0;
×
1156
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1157
          padding, endStream, promise);
1158
    }
1159

1160
    @Override
1161
    public ChannelFuture writeWindowUpdate(
1162
        ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
1163
      pingCount = 0;
1✔
1164
      return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
1✔
1165
    }
1166

1167
    @Override
1168
    public ChannelFuture writePing(
1169
        ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
1170
      if (!ack) {
1✔
1171
        pingCount++;
1✔
1172
      }
1173
      return super.writePing(ctx, ack, data, promise);
1✔
1174
    }
1175

1176
    @Override
1177
    public ChannelFuture writeData(
1178
        ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
1179
        ChannelPromise promise) {
1180
      if (data.isReadable()) {
1✔
1181
        pingCount = 0;
1✔
1182
      }
1183
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1184
    }
1185
  }
1186
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc