• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20126

23 Dec 2025 05:06AM UTC coverage: 88.691% (-0.02%) from 88.706%
#20126

push

github

web-flow
opentelemetry: plumb subchannel metrics disconnect error (#12342)

Finishes the remaining work of
[A94](https://github.com/grpc/proposal/pull/485/files) i.e. the plumbing the disconnect error

35472 of 39995 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.58
/../netty/src/main/java/io/grpc/netty/NettyClientHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
20
import static io.netty.util.CharsetUtil.UTF_8;
21
import static io.netty.util.internal.ObjectUtil.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Stopwatch;
26
import com.google.common.base.Supplier;
27
import com.google.common.base.Ticker;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.InternalChannelz;
31
import io.grpc.InternalStatus;
32
import io.grpc.Metadata;
33
import io.grpc.Status;
34
import io.grpc.StatusException;
35
import io.grpc.internal.ClientStreamListener.RpcProgress;
36
import io.grpc.internal.ClientTransport.PingCallback;
37
import io.grpc.internal.DisconnectError;
38
import io.grpc.internal.GoAwayDisconnectError;
39
import io.grpc.internal.GrpcAttributes;
40
import io.grpc.internal.GrpcUtil;
41
import io.grpc.internal.Http2Ping;
42
import io.grpc.internal.InUseStateAggregator;
43
import io.grpc.internal.KeepAliveManager;
44
import io.grpc.internal.SimpleDisconnectError;
45
import io.grpc.internal.TransportTracer;
46
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
47
import io.netty.buffer.ByteBuf;
48
import io.netty.buffer.ByteBufUtil;
49
import io.netty.buffer.Unpooled;
50
import io.netty.channel.Channel;
51
import io.netty.channel.ChannelFuture;
52
import io.netty.channel.ChannelFutureListener;
53
import io.netty.channel.ChannelHandlerContext;
54
import io.netty.channel.ChannelPromise;
55
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
56
import io.netty.handler.codec.http2.DefaultHttp2Connection;
57
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
58
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
59
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
60
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
61
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
62
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
63
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
64
import io.netty.handler.codec.http2.Http2CodecUtil;
65
import io.netty.handler.codec.http2.Http2Connection;
66
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
67
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
68
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
69
import io.netty.handler.codec.http2.Http2Error;
70
import io.netty.handler.codec.http2.Http2Exception;
71
import io.netty.handler.codec.http2.Http2FrameAdapter;
72
import io.netty.handler.codec.http2.Http2FrameLogger;
73
import io.netty.handler.codec.http2.Http2FrameReader;
74
import io.netty.handler.codec.http2.Http2FrameWriter;
75
import io.netty.handler.codec.http2.Http2Headers;
76
import io.netty.handler.codec.http2.Http2HeadersDecoder;
77
import io.netty.handler.codec.http2.Http2HeadersEncoder;
78
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
79
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
80
import io.netty.handler.codec.http2.Http2Settings;
81
import io.netty.handler.codec.http2.Http2Stream;
82
import io.netty.handler.codec.http2.Http2StreamVisitor;
83
import io.netty.handler.codec.http2.StreamBufferingEncoder;
84
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
85
import io.netty.handler.logging.LogLevel;
86
import io.perfmark.PerfMark;
87
import io.perfmark.Tag;
88
import io.perfmark.TaskCloseable;
89
import java.nio.channels.ClosedChannelException;
90
import java.util.LinkedHashMap;
91
import java.util.Map;
92
import java.util.concurrent.Executor;
93
import java.util.logging.Level;
94
import java.util.logging.Logger;
95
import javax.annotation.Nullable;
96

97
/**
98
 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
99
 * the context of the Netty Channel thread.
100
 */
101
class NettyClientHandler extends AbstractNettyHandler {
102
  private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
1✔
103
  static boolean enablePerRpcAuthorityCheck =
1✔
104
      GrpcUtil.getFlag("GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK", false);
1✔
105

106
  /**
107
   * A message that simply passes through the channel without any real processing. It is useful to
108
   * check if buffers have been drained and test the health of the channel in a single operation.
109
   */
110
  static final Object NOOP_MESSAGE = new Object();
1✔
111

112
  /**
113
   * Status used when the transport has exhausted the number of streams.
114
   */
115
  private static final Status EXHAUSTED_STREAMS_STATUS =
1✔
116
          Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");
1✔
117
  private static final long USER_PING_PAYLOAD = 1111;
118

119
  private final Http2Connection.PropertyKey streamKey;
120
  private final ClientTransportLifecycleManager lifecycleManager;
121
  private final KeepAliveManager keepAliveManager;
122
  // Returns new unstarted stopwatches
123
  private final Supplier<Stopwatch> stopwatchFactory;
124
  private final TransportTracer transportTracer;
125
  private final Attributes eagAttributes;
126
  private final String authority;
127
  private final InUseStateAggregator<Http2Stream> inUseState =
1✔
128
      new InUseStateAggregator<Http2Stream>() {
1✔
129
        @Override
130
        protected void handleInUse() {
131
          lifecycleManager.notifyInUse(true);
1✔
132
        }
1✔
133

134
        @Override
135
        protected void handleNotInUse() {
136
          lifecycleManager.notifyInUse(false);
1✔
137
        }
1✔
138
      };
139
  private final Map<String, Status> peerVerificationResults =
1✔
140
      new LinkedHashMap<String, Status>() {
1✔
141
        @Override
142
        protected boolean removeEldestEntry(Map.Entry<String, Status> eldest) {
143
          return size() > 100;
1✔
144
        }
145
      };
146

147
  private WriteQueue clientWriteQueue;
148
  private Http2Ping ping;
149
  private Attributes attributes;
150
  private InternalChannelz.Security securityInfo;
151
  private Status abruptGoAwayStatus;
152
  private Status channelInactiveReason;
153

154
  static NettyClientHandler newHandler(
155
      ClientTransportLifecycleManager lifecycleManager,
156
      @Nullable KeepAliveManager keepAliveManager,
157
      boolean autoFlowControl,
158
      int flowControlWindow,
159
      int maxHeaderListSize,
160
      int softLimitHeaderListSize,
161
      Supplier<Stopwatch> stopwatchFactory,
162
      Runnable tooManyPingsRunnable,
163
      TransportTracer transportTracer,
164
      Attributes eagAttributes,
165
      String authority,
166
      ChannelLogger negotiationLogger,
167
      Ticker ticker) {
168
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
169
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
1✔
170
    Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
1✔
171
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
172
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
173
    Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(encoder);
1✔
174
    Http2Connection connection = new DefaultHttp2Connection(false);
1✔
175
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
176
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
177
    DefaultHttp2RemoteFlowController controller =
1✔
178
        new DefaultHttp2RemoteFlowController(connection, dist);
179
    connection.remote().flowController(controller);
1✔
180

181
    return newHandler(
1✔
182
        connection,
183
        frameReader,
184
        frameWriter,
185
        lifecycleManager,
186
        keepAliveManager,
187
        autoFlowControl,
188
        flowControlWindow,
189
        maxHeaderListSize,
190
        softLimitHeaderListSize,
191
        stopwatchFactory,
192
        tooManyPingsRunnable,
193
        transportTracer,
194
        eagAttributes,
195
        authority,
196
        negotiationLogger,
197
        ticker);
198
  }
199

200
  @VisibleForTesting
201
  static NettyClientHandler newHandler(
202
      final Http2Connection connection,
203
      Http2FrameReader frameReader,
204
      Http2FrameWriter frameWriter,
205
      ClientTransportLifecycleManager lifecycleManager,
206
      KeepAliveManager keepAliveManager,
207
      boolean autoFlowControl,
208
      int flowControlWindow,
209
      int maxHeaderListSize,
210
      int softLimitHeaderListSize,
211
      Supplier<Stopwatch> stopwatchFactory,
212
      Runnable tooManyPingsRunnable,
213
      TransportTracer transportTracer,
214
      Attributes eagAttributes,
215
      String authority,
216
      ChannelLogger negotiationLogger,
217
      Ticker ticker) {
218
    Preconditions.checkNotNull(connection, "connection");
1✔
219
    Preconditions.checkNotNull(frameReader, "frameReader");
1✔
220
    Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
1✔
221
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
1✔
222
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
223
    Preconditions.checkArgument(softLimitHeaderListSize > 0,
1✔
224
        "softLimitHeaderListSize must be positive");
225
    Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
226
    Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
227
    Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
228
    Preconditions.checkNotNull(authority, "authority");
1✔
229

230
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
1✔
231
    frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
1✔
232
    frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
1✔
233

234
    PingCountingFrameWriter pingCounter;
235
    frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
1✔
236

237
    StreamBufferingEncoder encoder =
1✔
238
        new StreamBufferingEncoder(
239
            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
240

241
    // Create the local flow controller configured to auto-refill the connection window.
242
    connection.local().flowController(
1✔
243
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
244

245
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
246
        frameReader);
247

248
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection));
1✔
249

250
    Http2Settings settings = new Http2Settings();
1✔
251
    settings.pushEnabled(false);
1✔
252
    settings.initialWindowSize(flowControlWindow);
1✔
253
    settings.maxConcurrentStreams(0);
1✔
254
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
255

256
    return new NettyClientHandler(
1✔
257
        decoder,
258
        encoder,
259
        settings,
260
        negotiationLogger,
261
        lifecycleManager,
262
        keepAliveManager,
263
        stopwatchFactory,
264
        tooManyPingsRunnable,
265
        transportTracer,
266
        eagAttributes,
267
        authority,
268
        autoFlowControl,
269
        pingCounter,
270
        ticker,
271
        maxHeaderListSize,
272
        softLimitHeaderListSize);
273
  }
274

275
  private NettyClientHandler(
276
      Http2ConnectionDecoder decoder,
277
      Http2ConnectionEncoder encoder,
278
      Http2Settings settings,
279
      ChannelLogger negotiationLogger,
280
      ClientTransportLifecycleManager lifecycleManager,
281
      KeepAliveManager keepAliveManager,
282
      Supplier<Stopwatch> stopwatchFactory,
283
      final Runnable tooManyPingsRunnable,
284
      TransportTracer transportTracer,
285
      Attributes eagAttributes,
286
      String authority,
287
      boolean autoFlowControl,
288
      PingLimiter pingLimiter,
289
      Ticker ticker,
290
      int maxHeaderListSize,
291
      int softLimitHeaderListSize) {
292
    super(
1✔
293
        /* channelUnused= */ null,
294
        decoder,
295
        encoder,
296
        settings,
297
        negotiationLogger,
298
        autoFlowControl,
299
        pingLimiter,
300
        ticker,
301
        maxHeaderListSize,
302
        softLimitHeaderListSize);
303
    this.lifecycleManager = lifecycleManager;
1✔
304
    this.keepAliveManager = keepAliveManager;
1✔
305
    this.stopwatchFactory = stopwatchFactory;
1✔
306
    this.transportTracer = Preconditions.checkNotNull(transportTracer);
1✔
307
    this.eagAttributes = eagAttributes;
1✔
308
    this.authority = authority;
1✔
309
    this.attributes = Attributes.newBuilder()
1✔
310
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
1✔
311

312
    // Set the frame listener on the decoder.
313
    decoder().frameListener(new FrameListener());
1✔
314

315
    Http2Connection connection = encoder.connection();
1✔
316
    streamKey = connection.newKey();
1✔
317

318
    connection.addListener(new Http2ConnectionAdapter() {
1✔
319
      @Override
320
      public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
321
        byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
1✔
322
        goingAway(errorCode, debugDataBytes);
1✔
323
        if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
1✔
324
          String data = new String(debugDataBytes, UTF_8);
1✔
325
          logger.log(
1✔
326
              Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data);
327
          if ("too_many_pings".equals(data)) {
1✔
328
            tooManyPingsRunnable.run();
1✔
329
          }
330
        }
331
      }
1✔
332

333
      @Override
334
      public void onStreamActive(Http2Stream stream) {
335
        if (connection().numActiveStreams() == 1
1✔
336
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
337
          NettyClientHandler.this.keepAliveManager.onTransportActive();
1✔
338
        }
339
      }
1✔
340

341
      @Override
342
      public void onStreamClosed(Http2Stream stream) {
343
        // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
344
        // the first place, we don't propagate that option here, and it's safe to reset the in-use
345
        // state for them, which will be a cheap no-op.
346
        inUseState.updateObjectInUse(stream, false);
1✔
347
        if (connection().numActiveStreams() == 0
1✔
348
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
349
          NettyClientHandler.this.keepAliveManager.onTransportIdle();
1✔
350
        }
351
      }
1✔
352
    });
353
  }
1✔
354

355
  /**
356
   * The protocol negotiation attributes, available once the protocol negotiation completes;
357
   * otherwise returns {@code Attributes.EMPTY}.
358
   */
359
  Attributes getAttributes() {
360
    return attributes;
1✔
361
  }
362

363
  /**
364
   * Handler for commands sent from the stream.
365
   */
366
  @Override
367
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
368
          throws Exception {
369
    if (msg instanceof CreateStreamCommand) {
1✔
370
      createStream((CreateStreamCommand) msg, promise);
1✔
371
    } else if (msg instanceof SendGrpcFrameCommand) {
1✔
372
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
373
    } else if (msg instanceof CancelClientStreamCommand) {
1✔
374
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
1✔
375
    } else if (msg instanceof SendPingCommand) {
1✔
376
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
1✔
377
    } else if (msg instanceof GracefulCloseCommand) {
1✔
378
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
1✔
379
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
380
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
381
    } else if (msg == NOOP_MESSAGE) {
1✔
382
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
1✔
383
    } else {
384
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
385
    }
386
  }
1✔
387

388
  void startWriteQueue(Channel channel) {
389
    clientWriteQueue = new WriteQueue(channel);
1✔
390
  }
1✔
391

392
  WriteQueue getWriteQueue() {
393
    return clientWriteQueue;
1✔
394
  }
395

396
  ClientTransportLifecycleManager getLifecycleManager() {
397
    return lifecycleManager;
1✔
398
  }
399

400
  /**
401
   * Returns the given processed bytes back to inbound flow control.
402
   */
403
  void returnProcessedBytes(Http2Stream stream, int bytes) {
404
    try {
405
      decoder().flowController().consumeBytes(stream, bytes);
1✔
406
    } catch (Http2Exception e) {
×
407
      throw new RuntimeException(e);
×
408
    }
1✔
409
  }
1✔
410

411
  private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
412
    // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
413
    if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
1✔
414
      NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
415
      PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
1✔
416
      // check metadata size vs soft limit
417
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
418
      boolean shouldFail =
1✔
419
          Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
420
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize);
421
      if (shouldFail && endStream) {
1✔
422
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
×
423
            .withDescription(
×
424
                String.format(
×
425
                    "Server Status + Trailers of size %d exceeded Metadata size soft limit: %d",
426
                    h2HeadersSize,
×
427
                    softLimitHeaderListSize)), true, new Metadata());
×
428
        return;
×
429
      } else if (shouldFail) {
1✔
430
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
1✔
431
            .withDescription(
1✔
432
                String.format(
1✔
433
                    "Server Headers of size %d exceeded Metadata size soft limit: %d",
434
                    h2HeadersSize,
1✔
435
                    softLimitHeaderListSize)), true, new Metadata());
1✔
436
        return;
1✔
437
      }
438
      stream.transportHeadersReceived(headers, endStream);
1✔
439
    }
440

441
    if (keepAliveManager != null) {
1✔
442
      keepAliveManager.onDataReceived();
1✔
443
    }
444
  }
1✔
445

446
  /**
447
   * Handler for an inbound HTTP/2 DATA frame.
448
   */
449
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
450
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
451
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
452
    PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
1✔
453
    stream.transportDataReceived(data, endOfStream);
1✔
454
    if (keepAliveManager != null) {
1✔
455
      keepAliveManager.onDataReceived();
1✔
456
    }
457
  }
1✔
458

459
  /**
460
   * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
461
   */
462
  private void onRstStreamRead(int streamId, long errorCode) {
463
    NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
1✔
464
    if (stream != null) {
1✔
465
      PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
1✔
466
      Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
1✔
467
      stream.transportReportStatus(
1✔
468
          status,
469
          errorCode == Http2Error.REFUSED_STREAM.code()
1✔
470
              ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
471
          false /*stop delivery*/,
472
          new Metadata());
473
      if (keepAliveManager != null) {
1✔
474
        keepAliveManager.onDataReceived();
1✔
475
      }
476
    }
477
  }
1✔
478

479
  @Override
480
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
481
    logger.fine("Network channel being closed by the application.");
1✔
482
    if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
1✔
483
      lifecycleManager.notifyShutdown(
1✔
484
          Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"),
1✔
485
          SimpleDisconnectError.UNKNOWN);
486
    }
487
    super.close(ctx, promise);
1✔
488
  }
1✔
489

490
  /**
491
   * Handler for the Channel shutting down.
492
   */
493
  @Override
494
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
495
    try {
496
      logger.fine("Network channel is closed");
1✔
497
      Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
1✔
498
      lifecycleManager.notifyShutdown(status, SimpleDisconnectError.UNKNOWN);
1✔
499
      final Status streamStatus;
500
      if (channelInactiveReason != null) {
1✔
501
        streamStatus = channelInactiveReason;
1✔
502
      } else {
503
        streamStatus = lifecycleManager.getShutdownStatus();
1✔
504
      }
505
      try {
506
        cancelPing(lifecycleManager.getShutdownStatus());
1✔
507
        // Report status to the application layer for any open streams
508
        connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
509
          @Override
510
          public boolean visit(Http2Stream stream) throws Http2Exception {
511
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
512
            if (clientStream != null) {
1✔
513
              clientStream.transportReportStatus(streamStatus, false, new Metadata());
1✔
514
            }
515
            return true;
1✔
516
          }
517
        });
518
      } finally {
519
        lifecycleManager.notifyTerminated(status, SimpleDisconnectError.UNKNOWN);
1✔
520
      }
521
    } finally {
522
      // Close any open streams
523
      super.channelInactive(ctx);
1✔
524
      if (keepAliveManager != null) {
1✔
525
        keepAliveManager.onTransportTermination();
1✔
526
      }
527
    }
528
  }
1✔
529

530
  @Override
531
  public void handleProtocolNegotiationCompleted(
532
      Attributes attributes, InternalChannelz.Security securityInfo) {
533
    this.attributes = this.attributes.toBuilder().setAll(attributes).build();
1✔
534
    this.securityInfo = securityInfo;
1✔
535
    super.handleProtocolNegotiationCompleted(attributes, securityInfo);
1✔
536
    writeBufferingAndRemove(ctx().channel());
1✔
537
  }
1✔
538

539
  static void writeBufferingAndRemove(Channel channel) {
540
    checkNotNull(channel, "channel");
1✔
541
    ChannelHandlerContext handlerCtx =
1✔
542
        channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
1✔
543
    if (handlerCtx == null) {
1✔
544
      return;
1✔
545
    }
546
    ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
1✔
547
  }
1✔
548

549
  @Override
550
  public Attributes getEagAttributes() {
551
    return eagAttributes;
1✔
552
  }
553

554
  @Override
555
  public String getAuthority() {
556
    return authority;
1✔
557
  }
558

559
  InternalChannelz.Security getSecurityInfo() {
560
    return securityInfo;
1✔
561
  }
562

563
  @Override
564
  protected void onConnectionError(ChannelHandlerContext ctx,  boolean outbound, Throwable cause,
565
      Http2Exception http2Ex) {
566
    logger.log(Level.FINE, "Caught a connection error", cause);
1✔
567
    lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause),
1✔
568
        SimpleDisconnectError.SOCKET_ERROR);
569
    // Parent class will shut down the Channel
570
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
571
  }
1✔
572

573
  @Override
574
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
575
      Http2Exception.StreamException http2Ex) {
576
    // Close the stream with a status that contains the cause.
577
    NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId()));
1✔
578
    if (stream != null) {
1✔
579
      stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata());
1✔
580
    } else {
581
      logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause);
1✔
582
    }
583

584
    // Delegate to the base class to send a RST_STREAM.
585
    super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
586
  }
1✔
587

588
  @Override
589
  protected boolean isGracefulShutdownComplete() {
590
    // Only allow graceful shutdown to complete after all pending streams have completed.
591
    return super.isGracefulShutdownComplete()
1✔
592
        && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
1✔
593
  }
594

595
  /**
596
   * Attempts to create a new stream from the given command. If there are too many active streams,
597
   * the creation request is queued.
598
   */
599
  private void createStream(CreateStreamCommand command, ChannelPromise promise)
600
          throws Exception {
601
    if (lifecycleManager.getShutdownStatus() != null) {
1✔
602
      command.stream().setNonExistent();
1✔
603
      // The connection is going away (it is really the GOAWAY case),
604
      // just terminate the stream now.
605
      command.stream().transportReportStatus(
1✔
606
          lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
1✔
607
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
608
              lifecycleManager.getShutdownStatus(), null));
1✔
609
      return;
1✔
610
    }
611

612
    CharSequence authorityHeader = command.headers().authority();
1✔
613
    if (authorityHeader == null) {
1✔
614
      Status authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
615
              "Missing authority header");
616
      command.stream().setNonExistent();
1✔
617
      command.stream().transportReportStatus(
1✔
618
              Status.UNAVAILABLE, RpcProgress.PROCESSED, true, new Metadata());
619
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
620
              authorityVerificationStatus, null));
621
      return;
1✔
622
    }
623
    // No need to verify authority for the rpc outgoing header if it is same as the authority
624
    // for the transport
625
    if (!authority.contentEquals(authorityHeader)) {
1✔
626
      Status authorityVerificationStatus = peerVerificationResults.get(
1✔
627
              authorityHeader.toString());
1✔
628
      if (authorityVerificationStatus == null) {
1✔
629
        if (attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) == null) {
1✔
630
          authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
631
                  "Authority verifier not found to verify authority");
632
          command.stream().setNonExistent();
1✔
633
          command.stream().transportReportStatus(
1✔
634
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
635
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
636
                  authorityVerificationStatus, null));
637
          return;
1✔
638
        }
639
        authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
1✔
640
                .verifyAuthority(authorityHeader.toString());
1✔
641
        peerVerificationResults.put(authorityHeader.toString(), authorityVerificationStatus);
1✔
642
        if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) {
1✔
643
          logger.log(Level.WARNING, String.format("%s.%s",
1✔
644
                          authorityVerificationStatus.getDescription(),
1✔
645
                          enablePerRpcAuthorityCheck
1✔
646
                                  ? "" : " This will be an error in the future."),
1✔
647
                  InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
648
                          authorityVerificationStatus, null));
649
        }
650
      }
651
      if (!authorityVerificationStatus.isOk()) {
1✔
652
        if (enablePerRpcAuthorityCheck) {
1✔
653
          command.stream().setNonExistent();
1✔
654
          command.stream().transportReportStatus(
1✔
655
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
656
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
657
                  authorityVerificationStatus, null));
658
          return;
1✔
659
        }
660
      }
661
    }
662
    // Get the stream ID for the new stream.
663
    int streamId;
664
    try {
665
      streamId = incrementAndGetNextStreamId();
1✔
666
    } catch (StatusException e) {
1✔
667
      command.stream().setNonExistent();
1✔
668
      // Stream IDs have been exhausted for this connection. Fail the promise immediately.
669
      promise.setFailure(e);
1✔
670

671
      // Initiate a graceful shutdown if we haven't already.
672
      if (!connection().goAwaySent()) {
1✔
673
        logger.fine("Stream IDs have been exhausted for this connection. "
1✔
674
                + "Initiating graceful shutdown of the connection.");
675
        lifecycleManager.notifyShutdown(e.getStatus(), SimpleDisconnectError.UNKNOWN);
1✔
676
        close(ctx(), ctx().newPromise());
1✔
677
      }
678
      return;
1✔
679
    }
1✔
680
    if (connection().goAwayReceived()) {
1✔
681
      Status s = abruptGoAwayStatus;
1✔
682
      int maxActiveStreams = connection().local().maxActiveStreams();
1✔
683
      int lastStreamId = connection().local().lastStreamKnownByPeer();
1✔
684
      if (s == null) {
1✔
685
        // Should be impossible, but handle pseudo-gracefully
686
        s = Status.INTERNAL.withDescription(
×
687
            "Failed due to abrupt GOAWAY, but can't find GOAWAY details");
688
      } else if (streamId > lastStreamId) {
1✔
689
        s = s.augmentDescription(
1✔
690
            "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId);
691
      } else if (connection().local().numActiveStreams() == maxActiveStreams) {
1✔
692
        s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams);
1✔
693
      }
694
      if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) {
1✔
695
        // This should only be reachable during onGoAwayReceived, as otherwise
696
        // getShutdownThrowable() != null
697
        command.stream().setNonExistent();
1✔
698
        command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
1✔
699
        promise.setFailure(s.asRuntimeException());
1✔
700
        return;
1✔
701
      }
702
    }
703

704
    NettyClientStream.TransportState stream = command.stream();
1✔
705
    Http2Headers headers = command.headers();
1✔
706
    stream.setId(streamId);
1✔
707

708
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
1✔
709
      PerfMark.linkIn(command.getLink());
1✔
710
      PerfMark.attachTag(stream.tag());
1✔
711
      createStreamTraced(
1✔
712
          streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
1✔
713
    }
714
  }
1✔
715

716
  private void createStreamTraced(
717
      final int streamId,
718
      final NettyClientStream.TransportState stream,
719
      final Http2Headers headers,
720
      boolean isGet,
721
      final boolean shouldBeCountedForInUse,
722
      final ChannelPromise promise) {
723
    // Create an intermediate promise so that we can intercept the failure reported back to the
724
    // application.
725
    ChannelPromise tempPromise = ctx().newPromise();
1✔
726
    encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
1✔
727
        .addListener(new ChannelFutureListener() {
1✔
728
          @Override
729
          public void operationComplete(ChannelFuture future) throws Exception {
730
            if (future.isSuccess()) {
1✔
731
              // The http2Stream will be null in case a stream buffered in the encoder
732
              // was canceled via RST_STREAM.
733
              Http2Stream http2Stream = connection().stream(streamId);
1✔
734
              if (http2Stream != null) {
1✔
735
                stream.getStatsTraceContext().clientOutboundHeaders();
1✔
736
                http2Stream.setProperty(streamKey, stream);
1✔
737

738
                // This delays the in-use state until the I/O completes, which technically may
739
                // be later than we would like.
740
                if (shouldBeCountedForInUse) {
1✔
741
                  inUseState.updateObjectInUse(http2Stream, true);
1✔
742
                }
743

744
                // Attach the client stream to the HTTP/2 stream object as user data.
745
                stream.setHttp2Stream(http2Stream);
1✔
746
                promise.setSuccess();
1✔
747
              } else {
748
                // Otherwise, the stream has been cancelled and Netty is sending a
749
                // RST_STREAM frame which causes it to purge pending writes from the
750
                // flow-controller and delete the http2Stream. The stream listener has already
751
                // been notified of cancellation so there is nothing to do.
752
                //
753
                // This process has been observed to fail in some circumstances, leaving listeners
754
                // unanswered. Ensure that some exception has been delivered consistent with the
755
                // implied RST_STREAM result above.
756
                Status status = Status.INTERNAL.withDescription("unknown stream for connection");
1✔
757
                promise.setFailure(status.asRuntimeException());
1✔
758
              }
759
            } else {
1✔
760
              Throwable cause = future.cause();
1✔
761
              if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
1✔
762
                StreamBufferingEncoder.Http2GoAwayException e =
1✔
763
                    (StreamBufferingEncoder.Http2GoAwayException) cause;
764
                Status status = statusFromH2Error(
1✔
765
                    Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
766
                    e.errorCode(), e.debugData());
1✔
767
                cause = status.asRuntimeException();
1✔
768
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
769
              } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
1✔
770
                Status status = lifecycleManager.getShutdownStatus();
1✔
771
                if (status == null) {
1✔
772
                  status = Status.UNAVAILABLE.withCause(cause)
×
773
                      .withDescription("Connection closed while stream is buffered");
×
774
                }
775
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
776
              }
777
              promise.setFailure(cause);
1✔
778
            }
779
          }
1✔
780
        });
781
    // When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in
782
    // StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS
783
    // are delayed because the OS may have too much buffered and isn't accepting the write. The
784
    // write promise is also delayed until flush(). However, we need to associate the netty stream
785
    // with the transport state so that goingAway() and forcefulClose() and able to notify the
786
    // stream of failures.
787
    //
788
    // This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but
789
    // it is better than nothing.
790
    Http2Stream http2Stream = connection().stream(streamId);
1✔
791
    if (http2Stream != null) {
1✔
792
      http2Stream.setProperty(streamKey, stream);
1✔
793
    }
794
  }
1✔
795

796
  /**
797
   * Cancels this stream.
798
   */
799
  private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
800
      ChannelPromise promise) {
801
    NettyClientStream.TransportState stream = cmd.stream();
1✔
802
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
1✔
803
      PerfMark.attachTag(stream.tag());
1✔
804
      PerfMark.linkIn(cmd.getLink());
1✔
805
      Status reason = cmd.reason();
1✔
806
      if (reason != null) {
1✔
807
        stream.transportReportStatus(reason, true, new Metadata());
1✔
808
      }
809
      if (!cmd.stream().isNonExistent()) {
1✔
810
        encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
1✔
811
      } else {
812
        promise.setSuccess();
1✔
813
      }
814
    }
815
  }
1✔
816

817
  /**
818
   * Sends the given GRPC frame for the stream.
819
   */
820
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
821
      ChannelPromise promise) {
822
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
1✔
823
      PerfMark.attachTag(cmd.stream().tag());
1✔
824
      PerfMark.linkIn(cmd.getLink());
1✔
825
      // Call the base class to write the HTTP/2 DATA frame.
826
      // Note: no need to flush since this is handled by the outbound flow controller.
827
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
828
    }
829
  }
1✔
830

831
  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
832
      ChannelPromise promise) {
833
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
1✔
834
      PerfMark.linkIn(msg.getLink());
1✔
835
      sendPingFrameTraced(ctx, msg, promise);
1✔
836
    }
837
  }
1✔
838

839
  /**
840
   * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
841
   * registered to be called when the existing operation completes, and no new frame is sent.
842
   */
843
  private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
844
      ChannelPromise promise) {
845
    // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
846
    // but before termination. After termination, messages will no longer arrive because the
847
    // pipeline clears all handlers on channel close.
848

849
    PingCallback callback = msg.callback();
1✔
850
    Executor executor = msg.executor();
1✔
851
    // we only allow one outstanding ping at a time, so just add the callback to
852
    // any outstanding operation
853
    if (ping != null) {
1✔
854
      promise.setSuccess();
1✔
855
      ping.addCallback(callback, executor);
1✔
856
      return;
1✔
857
    }
858

859
    // Use a new promise to prevent calling the callback twice on write failure: here and in
860
    // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if
861
    // ping != null above.
862
    promise.setSuccess();
1✔
863
    promise = ctx().newPromise();
1✔
864
    // set outstanding operation
865
    long data = USER_PING_PAYLOAD;
1✔
866
    Stopwatch stopwatch = stopwatchFactory.get();
1✔
867
    stopwatch.start();
1✔
868
    ping = new Http2Ping(data, stopwatch);
1✔
869
    ping.addCallback(callback, executor);
1✔
870
    // and then write the ping
871
    encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise);
1✔
872
    ctx.flush();
1✔
873
    final Http2Ping finalPing = ping;
1✔
874
    promise.addListener(new ChannelFutureListener() {
1✔
875
      @Override
876
      public void operationComplete(ChannelFuture future) throws Exception {
877
        if (future.isSuccess()) {
1✔
878
          transportTracer.reportKeepAliveSent();
1✔
879
          return;
1✔
880
        }
881
        Throwable cause = future.cause();
×
882
        Status status = lifecycleManager.getShutdownStatus();
×
883
        if (cause instanceof ClosedChannelException) {
×
884
          if (status == null) {
×
885
            status = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
×
886
                    .withCause(future.cause());
×
887
          }
888
        } else {
889
          status = Utils.statusFromThrowable(cause);
×
890
        }
891
        finalPing.failed(status);
×
892
        if (ping == finalPing) {
×
893
          ping = null;
×
894
        }
895
      }
×
896
    });
897
  }
1✔
898

899
  private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
900
      ChannelPromise promise) throws Exception {
901
    lifecycleManager.notifyShutdown(msg.getStatus(), SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
1✔
902
    // Explicitly flush to create any buffered streams before sending GOAWAY.
903
    // TODO(ejona): determine if the need to flush is a bug in Netty
904
    flush(ctx);
1✔
905
    close(ctx, promise);
1✔
906
  }
1✔
907

908
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
909
      ChannelPromise promise) throws Exception {
910
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
911
      @Override
912
      public boolean visit(Http2Stream stream) throws Http2Exception {
913
        NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
914
        Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
1✔
915
        try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
1✔
916
          PerfMark.linkIn(msg.getLink());
1✔
917
          PerfMark.attachTag(tag);
1✔
918
          if (clientStream != null) {
1✔
919
            clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
1✔
920
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
921
          }
922
          stream.close();
1✔
923
          return true;
1✔
924
        }
925
      }
926
    });
927
    close(ctx, promise);
1✔
928
  }
1✔
929

930
  /**
931
   * Handler for a GOAWAY being received. Fails any streams created after the
932
   * last known stream. May only be called during a read.
933
   */
934
  private void goingAway(long errorCode, byte[] debugData) {
935
    Status finalStatus = statusFromH2Error(
1✔
936
        Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
937
    DisconnectError disconnectError = new GoAwayDisconnectError(
1✔
938
        GrpcUtil.Http2Error.forCode(errorCode));
1✔
939
    lifecycleManager.notifyGracefulShutdown(finalStatus, disconnectError);
1✔
940
    abruptGoAwayStatus = statusFromH2Error(
1✔
941
        Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
942
    // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
943
    // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
944
    // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
945
    // since the main time servers should use abrupt GOAWAYs if there is a protocol error, and if
946
    // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
947
    // UNAVAILABLE. https://github.com/netty/netty/issues/10670
948
    final Status abruptGoAwayStatusConservative = statusFromH2Error(
1✔
949
        null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
950
    final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code();
1✔
951
    // Try to allocate as many in-flight streams as possible, to reduce race window of
952
    // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
953
    // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
954
    // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
955
    // processed and thus this processing must be in-line before processing additional reads.
956

957
    // This can cause reentrancy, but should be minor since it is normal to handle writes in
958
    // response to a read. Also, the call stack is rather shallow at this point
959
    clientWriteQueue.drainNow();
1✔
960
    if (lifecycleManager.notifyShutdown(finalStatus, disconnectError)) {
1✔
961
      // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
962
      // RPCs were not observed by the remote and so should be UNAVAILABLE.
963
      channelInactiveReason = statusFromH2Error(
1✔
964
          null, "Connection closed after GOAWAY", errorCode, debugData);
965
    }
966

967
    final int lastKnownStream = connection().local().lastStreamKnownByPeer();
1✔
968
    try {
969
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
970
        @Override
971
        public boolean visit(Http2Stream stream) throws Http2Exception {
972
          if (stream.id() > lastKnownStream) {
1✔
973
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
974
            if (clientStream != null) {
1✔
975
              // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
976
              // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
977
              // retries, but only if something else caused a connection failure.
978
              RpcProgress progress = mayBeHittingNettyBug
1✔
979
                  ? RpcProgress.PROCESSED
1✔
980
                  : RpcProgress.REFUSED;
1✔
981
              clientStream.transportReportStatus(
1✔
982
                  abruptGoAwayStatusConservative, progress, false, new Metadata());
983
            }
984
            stream.close();
1✔
985
          }
986
          return true;
1✔
987
        }
988
      });
989
    } catch (Http2Exception e) {
×
990
      throw new RuntimeException(e);
×
991
    }
1✔
992
  }
1✔
993

994
  private void cancelPing(Status s) {
995
    if (ping != null) {
1✔
996
      ping.failed(s);
1✔
997
      ping = null;
1✔
998
    }
999
  }
1✔
1000

1001
  /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
1002
  private Status statusFromH2Error(
1003
      Status.Code statusCode, String context, long errorCode, byte[] debugData) {
1004
    Status status = GrpcUtil.Http2Error.statusForCode(errorCode);
1✔
1005
    if (statusCode == null) {
1✔
1006
      statusCode = status.getCode();
1✔
1007
    }
1008
    String debugString = "";
1✔
1009
    if (debugData != null && debugData.length > 0) {
1✔
1010
      // If a debug message was provided, use it.
1011
      debugString = ", debug data: " + new String(debugData, UTF_8);
1✔
1012
    }
1013
    return statusCode.toStatus()
1✔
1014
        .withDescription(context + ". " + status.getDescription() + debugString);
1✔
1015
  }
1016

1017
  /**
1018
   * Gets the client stream associated to the given HTTP/2 stream object.
1019
   */
1020
  private NettyClientStream.TransportState clientStream(Http2Stream stream) {
1021
    return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey);
1✔
1022
  }
1023

1024
  private int incrementAndGetNextStreamId() throws StatusException {
1025
    int nextStreamId = connection().local().incrementAndGetNextStreamId();
1✔
1026
    if (nextStreamId < 0) {
1✔
1027
      logger.fine("Stream IDs have been exhausted for this connection. "
1✔
1028
              + "Initiating graceful shutdown of the connection.");
1029
      throw EXHAUSTED_STREAMS_STATUS.asException();
1✔
1030
    }
1031
    return nextStreamId;
1✔
1032
  }
1033

1034
  private Http2Stream requireHttp2Stream(int streamId) {
1035
    Http2Stream stream = connection().stream(streamId);
1✔
1036
    if (stream == null) {
1✔
1037
      // This should never happen.
1038
      throw new AssertionError("Stream does not exist: " + streamId);
×
1039
    }
1040
    return stream;
1✔
1041
  }
1042

1043
  private class FrameListener extends Http2FrameAdapter {
1✔
1044
    private boolean firstSettings = true;
1✔
1045

1046
    @Override
1047
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
1048
      if (firstSettings) {
1✔
1049
        firstSettings = false;
1✔
1050
        attributes = lifecycleManager.filterAttributes(attributes);
1✔
1051
        lifecycleManager.notifyReady();
1✔
1052
      }
1053
    }
1✔
1054

1055
    @Override
1056
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
1057
        boolean endOfStream) throws Http2Exception {
1058
      NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
1059
      return padding;
1✔
1060
    }
1061

1062
    @Override
1063
    public void onHeadersRead(ChannelHandlerContext ctx,
1064
        int streamId,
1065
        Http2Headers headers,
1066
        int streamDependency,
1067
        short weight,
1068
        boolean exclusive,
1069
        int padding,
1070
        boolean endStream) throws Http2Exception {
1071
      NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
1✔
1072
    }
1✔
1073

1074
    @Override
1075
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
1076
        throws Http2Exception {
1077
      NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
1✔
1078
    }
1✔
1079

1080
    @Override
1081
    public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception {
1082
      Http2Ping p = ping;
1✔
1083
      if (ackPayload == flowControlPing().payload()) {
1✔
1084
        flowControlPing().updateWindow();
1✔
1085
        logger.log(Level.FINE, "Window: {0}",
1✔
1086
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1087
      } else if (p != null) {
1✔
1088
        if (p.payload() == ackPayload) {
1✔
1089
          p.complete();
1✔
1090
          ping = null;
1✔
1091
        } else {
1092
          logger.log(Level.WARNING,
1✔
1093
              "Received unexpected ping ack. Expecting {0}, got {1}",
1094
              new Object[] {p.payload(), ackPayload});
1✔
1095
        }
1096
      } else {
1097
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1098
      }
1099
      if (keepAliveManager != null) {
1✔
1100
        keepAliveManager.onDataReceived();
1✔
1101
      }
1102
    }
1✔
1103

1104
    @Override
1105
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
1106
      if (keepAliveManager != null) {
1✔
1107
        keepAliveManager.onDataReceived();
×
1108
      }
1109
    }
1✔
1110
  }
1111

1112
  private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
1113
      implements AbstractNettyHandler.PingLimiter {
1114
    private int pingCount;
1115

1116
    public PingCountingFrameWriter(Http2FrameWriter delegate) {
1117
      super(delegate);
1✔
1118
    }
1✔
1119

1120
    @Override
1121
    public boolean isPingAllowed() {
1122
      // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
1123
      return pingCount < 2;
1✔
1124
    }
1125

1126
    @Override
1127
    public ChannelFuture writeHeaders(
1128
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1129
        int padding, boolean endStream, ChannelPromise promise) {
1130
      pingCount = 0;
1✔
1131
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1132
    }
1133

1134
    @Override
1135
    public ChannelFuture writeHeaders(
1136
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1137
        int streamDependency, short weight, boolean exclusive,
1138
        int padding, boolean endStream, ChannelPromise promise) {
1139
      pingCount = 0;
×
1140
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1141
          padding, endStream, promise);
1142
    }
1143

1144
    @Override
1145
    public ChannelFuture writeWindowUpdate(
1146
        ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
1147
      pingCount = 0;
1✔
1148
      return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
1✔
1149
    }
1150

1151
    @Override
1152
    public ChannelFuture writePing(
1153
        ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
1154
      if (!ack) {
1✔
1155
        pingCount++;
1✔
1156
      }
1157
      return super.writePing(ctx, ack, data, promise);
1✔
1158
    }
1159

1160
    @Override
1161
    public ChannelFuture writeData(
1162
        ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
1163
        ChannelPromise promise) {
1164
      if (data.isReadable()) {
1✔
1165
        pingCount = 0;
1✔
1166
      }
1167
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1168
    }
1169
  }
1170
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc