• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19918

18 Jul 2025 04:05PM UTC coverage: 88.582% (-0.002%) from 88.584%
#19918

push

github

ejona86
netty: Associate netty stream eagerly to avoid client hang

In #12185, RPCs were randomly hanging. In #12207 this was tracked down
to the headers promise completing successfully, but the netty stream
was null. This was because the headers write hadn't completed but
stream.close() had been called by goingAway().

34656 of 39123 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.55
/../netty/src/main/java/io/grpc/netty/NettyClientHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
20
import static io.netty.util.CharsetUtil.UTF_8;
21
import static io.netty.util.internal.ObjectUtil.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Stopwatch;
26
import com.google.common.base.Supplier;
27
import com.google.common.base.Ticker;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.InternalChannelz;
31
import io.grpc.InternalStatus;
32
import io.grpc.Metadata;
33
import io.grpc.Status;
34
import io.grpc.StatusException;
35
import io.grpc.internal.ClientStreamListener.RpcProgress;
36
import io.grpc.internal.ClientTransport.PingCallback;
37
import io.grpc.internal.GrpcAttributes;
38
import io.grpc.internal.GrpcUtil;
39
import io.grpc.internal.Http2Ping;
40
import io.grpc.internal.InUseStateAggregator;
41
import io.grpc.internal.KeepAliveManager;
42
import io.grpc.internal.TransportTracer;
43
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
44
import io.netty.buffer.ByteBuf;
45
import io.netty.buffer.ByteBufUtil;
46
import io.netty.buffer.Unpooled;
47
import io.netty.channel.Channel;
48
import io.netty.channel.ChannelFuture;
49
import io.netty.channel.ChannelFutureListener;
50
import io.netty.channel.ChannelHandlerContext;
51
import io.netty.channel.ChannelPromise;
52
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
53
import io.netty.handler.codec.http2.DefaultHttp2Connection;
54
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
55
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
56
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
57
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
58
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
59
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
60
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
61
import io.netty.handler.codec.http2.Http2CodecUtil;
62
import io.netty.handler.codec.http2.Http2Connection;
63
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
64
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
65
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
66
import io.netty.handler.codec.http2.Http2Error;
67
import io.netty.handler.codec.http2.Http2Exception;
68
import io.netty.handler.codec.http2.Http2FrameAdapter;
69
import io.netty.handler.codec.http2.Http2FrameLogger;
70
import io.netty.handler.codec.http2.Http2FrameReader;
71
import io.netty.handler.codec.http2.Http2FrameWriter;
72
import io.netty.handler.codec.http2.Http2Headers;
73
import io.netty.handler.codec.http2.Http2HeadersDecoder;
74
import io.netty.handler.codec.http2.Http2HeadersEncoder;
75
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
76
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
77
import io.netty.handler.codec.http2.Http2Settings;
78
import io.netty.handler.codec.http2.Http2Stream;
79
import io.netty.handler.codec.http2.Http2StreamVisitor;
80
import io.netty.handler.codec.http2.StreamBufferingEncoder;
81
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
82
import io.netty.handler.logging.LogLevel;
83
import io.perfmark.PerfMark;
84
import io.perfmark.Tag;
85
import io.perfmark.TaskCloseable;
86
import java.nio.channels.ClosedChannelException;
87
import java.util.LinkedHashMap;
88
import java.util.Map;
89
import java.util.concurrent.Executor;
90
import java.util.logging.Level;
91
import java.util.logging.Logger;
92
import javax.annotation.Nullable;
93

94
/**
95
 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
96
 * the context of the Netty Channel thread.
97
 */
98
class NettyClientHandler extends AbstractNettyHandler {
99
  private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
1✔
100
  static boolean enablePerRpcAuthorityCheck =
1✔
101
      GrpcUtil.getFlag("GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK", false);
1✔
102

103
  /**
104
   * A message that simply passes through the channel without any real processing. It is useful to
105
   * check if buffers have been drained and test the health of the channel in a single operation.
106
   */
107
  static final Object NOOP_MESSAGE = new Object();
1✔
108

109
  /**
110
   * Status used when the transport has exhausted the number of streams.
111
   */
112
  private static final Status EXHAUSTED_STREAMS_STATUS =
1✔
113
          Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");
1✔
114
  private static final long USER_PING_PAYLOAD = 1111;
115

116
  private final Http2Connection.PropertyKey streamKey;
117
  private final ClientTransportLifecycleManager lifecycleManager;
118
  private final KeepAliveManager keepAliveManager;
119
  // Returns new unstarted stopwatches
120
  private final Supplier<Stopwatch> stopwatchFactory;
121
  private final TransportTracer transportTracer;
122
  private final Attributes eagAttributes;
123
  private final String authority;
124
  private final InUseStateAggregator<Http2Stream> inUseState =
1✔
125
      new InUseStateAggregator<Http2Stream>() {
1✔
126
        @Override
127
        protected void handleInUse() {
128
          lifecycleManager.notifyInUse(true);
1✔
129
        }
1✔
130

131
        @Override
132
        protected void handleNotInUse() {
133
          lifecycleManager.notifyInUse(false);
1✔
134
        }
1✔
135
      };
136
  private final Map<String, Status> peerVerificationResults =
1✔
137
      new LinkedHashMap<String, Status>() {
1✔
138
        @Override
139
        protected boolean removeEldestEntry(Map.Entry<String, Status> eldest) {
140
          return size() > 100;
1✔
141
        }
142
      };
143

144
  private WriteQueue clientWriteQueue;
145
  private Http2Ping ping;
146
  private Attributes attributes;
147
  private InternalChannelz.Security securityInfo;
148
  private Status abruptGoAwayStatus;
149
  private Status channelInactiveReason;
150

151
  static NettyClientHandler newHandler(
152
      ClientTransportLifecycleManager lifecycleManager,
153
      @Nullable KeepAliveManager keepAliveManager,
154
      boolean autoFlowControl,
155
      int flowControlWindow,
156
      int maxHeaderListSize,
157
      int softLimitHeaderListSize,
158
      Supplier<Stopwatch> stopwatchFactory,
159
      Runnable tooManyPingsRunnable,
160
      TransportTracer transportTracer,
161
      Attributes eagAttributes,
162
      String authority,
163
      ChannelLogger negotiationLogger,
164
      Ticker ticker) {
165
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
166
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
1✔
167
    Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
1✔
168
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
169
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
170
    Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(encoder);
1✔
171
    Http2Connection connection = new DefaultHttp2Connection(false);
1✔
172
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
173
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
174
    DefaultHttp2RemoteFlowController controller =
1✔
175
        new DefaultHttp2RemoteFlowController(connection, dist);
176
    connection.remote().flowController(controller);
1✔
177

178
    return newHandler(
1✔
179
        connection,
180
        frameReader,
181
        frameWriter,
182
        lifecycleManager,
183
        keepAliveManager,
184
        autoFlowControl,
185
        flowControlWindow,
186
        maxHeaderListSize,
187
        softLimitHeaderListSize,
188
        stopwatchFactory,
189
        tooManyPingsRunnable,
190
        transportTracer,
191
        eagAttributes,
192
        authority,
193
        negotiationLogger,
194
        ticker);
195
  }
196

197
  @VisibleForTesting
198
  static NettyClientHandler newHandler(
199
      final Http2Connection connection,
200
      Http2FrameReader frameReader,
201
      Http2FrameWriter frameWriter,
202
      ClientTransportLifecycleManager lifecycleManager,
203
      KeepAliveManager keepAliveManager,
204
      boolean autoFlowControl,
205
      int flowControlWindow,
206
      int maxHeaderListSize,
207
      int softLimitHeaderListSize,
208
      Supplier<Stopwatch> stopwatchFactory,
209
      Runnable tooManyPingsRunnable,
210
      TransportTracer transportTracer,
211
      Attributes eagAttributes,
212
      String authority,
213
      ChannelLogger negotiationLogger,
214
      Ticker ticker) {
215
    Preconditions.checkNotNull(connection, "connection");
1✔
216
    Preconditions.checkNotNull(frameReader, "frameReader");
1✔
217
    Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
1✔
218
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
1✔
219
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
220
    Preconditions.checkArgument(softLimitHeaderListSize > 0,
1✔
221
        "softLimitHeaderListSize must be positive");
222
    Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
223
    Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
224
    Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
225
    Preconditions.checkNotNull(authority, "authority");
1✔
226

227
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
1✔
228
    frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
1✔
229
    frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
1✔
230

231
    PingCountingFrameWriter pingCounter;
232
    frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
1✔
233

234
    StreamBufferingEncoder encoder =
1✔
235
        new StreamBufferingEncoder(
236
            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
237

238
    // Create the local flow controller configured to auto-refill the connection window.
239
    connection.local().flowController(
1✔
240
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
241

242
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
243
        frameReader);
244

245
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection));
1✔
246

247
    Http2Settings settings = new Http2Settings();
1✔
248
    settings.pushEnabled(false);
1✔
249
    settings.initialWindowSize(flowControlWindow);
1✔
250
    settings.maxConcurrentStreams(0);
1✔
251
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
252

253
    return new NettyClientHandler(
1✔
254
        decoder,
255
        encoder,
256
        settings,
257
        negotiationLogger,
258
        lifecycleManager,
259
        keepAliveManager,
260
        stopwatchFactory,
261
        tooManyPingsRunnable,
262
        transportTracer,
263
        eagAttributes,
264
        authority,
265
        autoFlowControl,
266
        pingCounter,
267
        ticker,
268
        maxHeaderListSize,
269
        softLimitHeaderListSize);
270
  }
271

272
  private NettyClientHandler(
273
      Http2ConnectionDecoder decoder,
274
      Http2ConnectionEncoder encoder,
275
      Http2Settings settings,
276
      ChannelLogger negotiationLogger,
277
      ClientTransportLifecycleManager lifecycleManager,
278
      KeepAliveManager keepAliveManager,
279
      Supplier<Stopwatch> stopwatchFactory,
280
      final Runnable tooManyPingsRunnable,
281
      TransportTracer transportTracer,
282
      Attributes eagAttributes,
283
      String authority,
284
      boolean autoFlowControl,
285
      PingLimiter pingLimiter,
286
      Ticker ticker,
287
      int maxHeaderListSize,
288
      int softLimitHeaderListSize) {
289
    super(
1✔
290
        /* channelUnused= */ null,
291
        decoder,
292
        encoder,
293
        settings,
294
        negotiationLogger,
295
        autoFlowControl,
296
        pingLimiter,
297
        ticker,
298
        maxHeaderListSize,
299
        softLimitHeaderListSize);
300
    this.lifecycleManager = lifecycleManager;
1✔
301
    this.keepAliveManager = keepAliveManager;
1✔
302
    this.stopwatchFactory = stopwatchFactory;
1✔
303
    this.transportTracer = Preconditions.checkNotNull(transportTracer);
1✔
304
    this.eagAttributes = eagAttributes;
1✔
305
    this.authority = authority;
1✔
306
    this.attributes = Attributes.newBuilder()
1✔
307
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
1✔
308

309
    // Set the frame listener on the decoder.
310
    decoder().frameListener(new FrameListener());
1✔
311

312
    Http2Connection connection = encoder.connection();
1✔
313
    streamKey = connection.newKey();
1✔
314

315
    connection.addListener(new Http2ConnectionAdapter() {
1✔
316
      @Override
317
      public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
318
        byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
1✔
319
        goingAway(errorCode, debugDataBytes);
1✔
320
        if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
1✔
321
          String data = new String(debugDataBytes, UTF_8);
1✔
322
          logger.log(
1✔
323
              Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data);
324
          if ("too_many_pings".equals(data)) {
1✔
325
            tooManyPingsRunnable.run();
1✔
326
          }
327
        }
328
      }
1✔
329

330
      @Override
331
      public void onStreamActive(Http2Stream stream) {
332
        if (connection().numActiveStreams() == 1
1✔
333
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
334
          NettyClientHandler.this.keepAliveManager.onTransportActive();
1✔
335
        }
336
      }
1✔
337

338
      @Override
339
      public void onStreamClosed(Http2Stream stream) {
340
        // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
341
        // the first place, we don't propagate that option here, and it's safe to reset the in-use
342
        // state for them, which will be a cheap no-op.
343
        inUseState.updateObjectInUse(stream, false);
1✔
344
        if (connection().numActiveStreams() == 0
1✔
345
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
346
          NettyClientHandler.this.keepAliveManager.onTransportIdle();
1✔
347
        }
348
      }
1✔
349
    });
350
  }
1✔
351

352
  /**
353
   * The protocol negotiation attributes, available once the protocol negotiation completes;
354
   * otherwise returns {@code Attributes.EMPTY}.
355
   */
356
  Attributes getAttributes() {
357
    return attributes;
1✔
358
  }
359

360
  /**
361
   * Handler for commands sent from the stream.
362
   */
363
  @Override
364
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
365
          throws Exception {
366
    if (msg instanceof CreateStreamCommand) {
1✔
367
      createStream((CreateStreamCommand) msg, promise);
1✔
368
    } else if (msg instanceof SendGrpcFrameCommand) {
1✔
369
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
370
    } else if (msg instanceof CancelClientStreamCommand) {
1✔
371
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
1✔
372
    } else if (msg instanceof SendPingCommand) {
1✔
373
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
1✔
374
    } else if (msg instanceof GracefulCloseCommand) {
1✔
375
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
1✔
376
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
377
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
378
    } else if (msg == NOOP_MESSAGE) {
1✔
379
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
1✔
380
    } else {
381
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
382
    }
383
  }
1✔
384

385
  void startWriteQueue(Channel channel) {
386
    clientWriteQueue = new WriteQueue(channel);
1✔
387
  }
1✔
388

389
  WriteQueue getWriteQueue() {
390
    return clientWriteQueue;
1✔
391
  }
392

393
  ClientTransportLifecycleManager getLifecycleManager() {
394
    return lifecycleManager;
1✔
395
  }
396

397
  /**
398
   * Returns the given processed bytes back to inbound flow control.
399
   */
400
  void returnProcessedBytes(Http2Stream stream, int bytes) {
401
    try {
402
      decoder().flowController().consumeBytes(stream, bytes);
1✔
403
    } catch (Http2Exception e) {
×
404
      throw new RuntimeException(e);
×
405
    }
1✔
406
  }
1✔
407

408
  private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
409
    // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
410
    if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
1✔
411
      NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
412
      PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
1✔
413
      // check metadata size vs soft limit
414
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
415
      boolean shouldFail =
1✔
416
          Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
417
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize);
418
      if (shouldFail && endStream) {
1✔
419
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
×
420
            .withDescription(
×
421
                String.format(
×
422
                    "Server Status + Trailers of size %d exceeded Metadata size soft limit: %d",
423
                    h2HeadersSize,
×
424
                    softLimitHeaderListSize)), true, new Metadata());
×
425
        return;
×
426
      } else if (shouldFail) {
1✔
427
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
1✔
428
            .withDescription(
1✔
429
                String.format(
1✔
430
                    "Server Headers of size %d exceeded Metadata size soft limit: %d",
431
                    h2HeadersSize,
1✔
432
                    softLimitHeaderListSize)), true, new Metadata());
1✔
433
        return;
1✔
434
      }
435
      stream.transportHeadersReceived(headers, endStream);
1✔
436
    }
437

438
    if (keepAliveManager != null) {
1✔
439
      keepAliveManager.onDataReceived();
1✔
440
    }
441
  }
1✔
442

443
  /**
444
   * Handler for an inbound HTTP/2 DATA frame.
445
   */
446
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
447
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
448
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
449
    PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
1✔
450
    stream.transportDataReceived(data, endOfStream);
1✔
451
    if (keepAliveManager != null) {
1✔
452
      keepAliveManager.onDataReceived();
1✔
453
    }
454
  }
1✔
455

456
  /**
457
   * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
458
   */
459
  private void onRstStreamRead(int streamId, long errorCode) {
460
    NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
1✔
461
    if (stream != null) {
1✔
462
      PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
1✔
463
      Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
1✔
464
      stream.transportReportStatus(
1✔
465
          status,
466
          errorCode == Http2Error.REFUSED_STREAM.code()
1✔
467
              ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
468
          false /*stop delivery*/,
469
          new Metadata());
470
      if (keepAliveManager != null) {
1✔
471
        keepAliveManager.onDataReceived();
1✔
472
      }
473
    }
474
  }
1✔
475

476
  @Override
477
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
478
    logger.fine("Network channel being closed by the application.");
1✔
479
    if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
1✔
480
      lifecycleManager.notifyShutdown(
1✔
481
          Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"));
1✔
482
    }
483
    super.close(ctx, promise);
1✔
484
  }
1✔
485

486
  /**
487
   * Handler for the Channel shutting down.
488
   */
489
  @Override
490
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
491
    try {
492
      logger.fine("Network channel is closed");
1✔
493
      Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
1✔
494
      lifecycleManager.notifyShutdown(status);
1✔
495
      final Status streamStatus;
496
      if (channelInactiveReason != null) {
1✔
497
        streamStatus = channelInactiveReason;
1✔
498
      } else {
499
        streamStatus = lifecycleManager.getShutdownStatus();
1✔
500
      }
501
      try {
502
        cancelPing(lifecycleManager.getShutdownStatus());
1✔
503
        // Report status to the application layer for any open streams
504
        connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
505
          @Override
506
          public boolean visit(Http2Stream stream) throws Http2Exception {
507
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
508
            if (clientStream != null) {
1✔
509
              clientStream.transportReportStatus(streamStatus, false, new Metadata());
1✔
510
            }
511
            return true;
1✔
512
          }
513
        });
514
      } finally {
515
        lifecycleManager.notifyTerminated(status);
1✔
516
      }
517
    } finally {
518
      // Close any open streams
519
      super.channelInactive(ctx);
1✔
520
      if (keepAliveManager != null) {
1✔
521
        keepAliveManager.onTransportTermination();
1✔
522
      }
523
    }
524
  }
1✔
525

526
  @Override
527
  public void handleProtocolNegotiationCompleted(
528
      Attributes attributes, InternalChannelz.Security securityInfo) {
529
    this.attributes = this.attributes.toBuilder().setAll(attributes).build();
1✔
530
    this.securityInfo = securityInfo;
1✔
531
    super.handleProtocolNegotiationCompleted(attributes, securityInfo);
1✔
532
    writeBufferingAndRemove(ctx().channel());
1✔
533
  }
1✔
534

535
  static void writeBufferingAndRemove(Channel channel) {
536
    checkNotNull(channel, "channel");
1✔
537
    ChannelHandlerContext handlerCtx =
1✔
538
        channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
1✔
539
    if (handlerCtx == null) {
1✔
540
      return;
1✔
541
    }
542
    ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
1✔
543
  }
1✔
544

545
  @Override
546
  public Attributes getEagAttributes() {
547
    return eagAttributes;
1✔
548
  }
549

550
  @Override
551
  public String getAuthority() {
552
    return authority;
1✔
553
  }
554

555
  InternalChannelz.Security getSecurityInfo() {
556
    return securityInfo;
1✔
557
  }
558

559
  @Override
560
  protected void onConnectionError(ChannelHandlerContext ctx,  boolean outbound, Throwable cause,
561
      Http2Exception http2Ex) {
562
    logger.log(Level.FINE, "Caught a connection error", cause);
1✔
563
    lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
1✔
564
    // Parent class will shut down the Channel
565
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
566
  }
1✔
567

568
  @Override
569
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
570
      Http2Exception.StreamException http2Ex) {
571
    // Close the stream with a status that contains the cause.
572
    NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId()));
1✔
573
    if (stream != null) {
1✔
574
      stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata());
1✔
575
    } else {
576
      logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause);
1✔
577
    }
578

579
    // Delegate to the base class to send a RST_STREAM.
580
    super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
581
  }
1✔
582

583
  @Override
584
  protected boolean isGracefulShutdownComplete() {
585
    // Only allow graceful shutdown to complete after all pending streams have completed.
586
    return super.isGracefulShutdownComplete()
1✔
587
        && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
1✔
588
  }
589

590
  /**
591
   * Attempts to create a new stream from the given command. If there are too many active streams,
592
   * the creation request is queued.
593
   */
594
  private void createStream(CreateStreamCommand command, ChannelPromise promise)
595
          throws Exception {
596
    if (lifecycleManager.getShutdownStatus() != null) {
1✔
597
      command.stream().setNonExistent();
1✔
598
      // The connection is going away (it is really the GOAWAY case),
599
      // just terminate the stream now.
600
      command.stream().transportReportStatus(
1✔
601
          lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
1✔
602
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
603
              lifecycleManager.getShutdownStatus(), null));
1✔
604
      return;
1✔
605
    }
606

607
    CharSequence authorityHeader = command.headers().authority();
1✔
608
    if (authorityHeader == null) {
1✔
609
      Status authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
610
              "Missing authority header");
611
      command.stream().setNonExistent();
1✔
612
      command.stream().transportReportStatus(
1✔
613
              Status.UNAVAILABLE, RpcProgress.PROCESSED, true, new Metadata());
614
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
615
              authorityVerificationStatus, null));
616
      return;
1✔
617
    }
618
    // No need to verify authority for the rpc outgoing header if it is same as the authority
619
    // for the transport
620
    if (!authority.contentEquals(authorityHeader)) {
1✔
621
      Status authorityVerificationStatus = peerVerificationResults.get(
1✔
622
              authorityHeader.toString());
1✔
623
      if (authorityVerificationStatus == null) {
1✔
624
        if (attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) == null) {
1✔
625
          authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
626
                  "Authority verifier not found to verify authority");
627
          command.stream().setNonExistent();
1✔
628
          command.stream().transportReportStatus(
1✔
629
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
630
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
631
                  authorityVerificationStatus, null));
632
          return;
1✔
633
        }
634
        authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
1✔
635
                .verifyAuthority(authorityHeader.toString());
1✔
636
        peerVerificationResults.put(authorityHeader.toString(), authorityVerificationStatus);
1✔
637
        if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) {
1✔
638
          logger.log(Level.WARNING, String.format("%s.%s",
1✔
639
                          authorityVerificationStatus.getDescription(),
1✔
640
                          enablePerRpcAuthorityCheck
1✔
641
                                  ? "" : " This will be an error in the future."),
1✔
642
                  InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
643
                          authorityVerificationStatus, null));
644
        }
645
      }
646
      if (!authorityVerificationStatus.isOk()) {
1✔
647
        if (enablePerRpcAuthorityCheck) {
1✔
648
          command.stream().setNonExistent();
1✔
649
          command.stream().transportReportStatus(
1✔
650
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
651
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
652
                  authorityVerificationStatus, null));
653
          return;
1✔
654
        }
655
      }
656
    }
657
    // Get the stream ID for the new stream.
658
    int streamId;
659
    try {
660
      streamId = incrementAndGetNextStreamId();
1✔
661
    } catch (StatusException e) {
1✔
662
      command.stream().setNonExistent();
1✔
663
      // Stream IDs have been exhausted for this connection. Fail the promise immediately.
664
      promise.setFailure(e);
1✔
665

666
      // Initiate a graceful shutdown if we haven't already.
667
      if (!connection().goAwaySent()) {
1✔
668
        logger.fine("Stream IDs have been exhausted for this connection. "
1✔
669
                + "Initiating graceful shutdown of the connection.");
670
        lifecycleManager.notifyShutdown(e.getStatus());
1✔
671
        close(ctx(), ctx().newPromise());
1✔
672
      }
673
      return;
1✔
674
    }
1✔
675
    if (connection().goAwayReceived()) {
1✔
676
      Status s = abruptGoAwayStatus;
1✔
677
      int maxActiveStreams = connection().local().maxActiveStreams();
1✔
678
      int lastStreamId = connection().local().lastStreamKnownByPeer();
1✔
679
      if (s == null) {
1✔
680
        // Should be impossible, but handle pseudo-gracefully
681
        s = Status.INTERNAL.withDescription(
×
682
            "Failed due to abrupt GOAWAY, but can't find GOAWAY details");
683
      } else if (streamId > lastStreamId) {
1✔
684
        s = s.augmentDescription(
1✔
685
            "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId);
686
      } else if (connection().local().numActiveStreams() == maxActiveStreams) {
1✔
687
        s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams);
1✔
688
      }
689
      if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) {
1✔
690
        // This should only be reachable during onGoAwayReceived, as otherwise
691
        // getShutdownThrowable() != null
692
        command.stream().setNonExistent();
1✔
693
        command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
1✔
694
        promise.setFailure(s.asRuntimeException());
1✔
695
        return;
1✔
696
      }
697
    }
698

699
    NettyClientStream.TransportState stream = command.stream();
1✔
700
    Http2Headers headers = command.headers();
1✔
701
    stream.setId(streamId);
1✔
702

703
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
1✔
704
      PerfMark.linkIn(command.getLink());
1✔
705
      PerfMark.attachTag(stream.tag());
1✔
706
      createStreamTraced(
1✔
707
          streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
1✔
708
    }
709
  }
1✔
710

711
  private void createStreamTraced(
712
      final int streamId,
713
      final NettyClientStream.TransportState stream,
714
      final Http2Headers headers,
715
      boolean isGet,
716
      final boolean shouldBeCountedForInUse,
717
      final ChannelPromise promise) {
718
    // Create an intermediate promise so that we can intercept the failure reported back to the
719
    // application.
720
    ChannelPromise tempPromise = ctx().newPromise();
1✔
721
    encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
1✔
722
        .addListener(new ChannelFutureListener() {
1✔
723
          @Override
724
          public void operationComplete(ChannelFuture future) throws Exception {
725
            if (future.isSuccess()) {
1✔
726
              // The http2Stream will be null in case a stream buffered in the encoder
727
              // was canceled via RST_STREAM.
728
              Http2Stream http2Stream = connection().stream(streamId);
1✔
729
              if (http2Stream != null) {
1✔
730
                stream.getStatsTraceContext().clientOutboundHeaders();
1✔
731
                http2Stream.setProperty(streamKey, stream);
1✔
732

733
                // This delays the in-use state until the I/O completes, which technically may
734
                // be later than we would like.
735
                if (shouldBeCountedForInUse) {
1✔
736
                  inUseState.updateObjectInUse(http2Stream, true);
1✔
737
                }
738

739
                // Attach the client stream to the HTTP/2 stream object as user data.
740
                stream.setHttp2Stream(http2Stream);
1✔
741
                promise.setSuccess();
1✔
742
              } else {
743
                // Otherwise, the stream has been cancelled and Netty is sending a
744
                // RST_STREAM frame which causes it to purge pending writes from the
745
                // flow-controller and delete the http2Stream. The stream listener has already
746
                // been notified of cancellation so there is nothing to do.
747
                //
748
                // This process has been observed to fail in some circumstances, leaving listeners
749
                // unanswered. Ensure that some exception has been delivered consistent with the
750
                // implied RST_STREAM result above.
751
                Status status = Status.INTERNAL.withDescription("unknown stream for connection");
1✔
752
                promise.setFailure(status.asRuntimeException());
1✔
753
              }
754
            } else {
1✔
755
              Throwable cause = future.cause();
1✔
756
              if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
1✔
757
                StreamBufferingEncoder.Http2GoAwayException e =
1✔
758
                    (StreamBufferingEncoder.Http2GoAwayException) cause;
759
                Status status = statusFromH2Error(
1✔
760
                    Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
761
                    e.errorCode(), e.debugData());
1✔
762
                cause = status.asRuntimeException();
1✔
763
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
764
              } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
1✔
765
                Status status = lifecycleManager.getShutdownStatus();
1✔
766
                if (status == null) {
1✔
767
                  status = Status.UNAVAILABLE.withCause(cause)
×
768
                      .withDescription("Connection closed while stream is buffered");
×
769
                }
770
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
771
              }
772
              promise.setFailure(cause);
1✔
773
            }
774
          }
1✔
775
        });
776
    // When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in
777
    // StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS
778
    // are delayed because the OS may have too much buffered and isn't accepting the write. The
779
    // write promise is also delayed until flush(). However, we need to associate the netty stream
780
    // with the transport state so that goingAway() and forcefulClose() and able to notify the
781
    // stream of failures.
782
    //
783
    // This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but
784
    // it is better than nothing.
785
    Http2Stream http2Stream = connection().stream(streamId);
1✔
786
    if (http2Stream != null) {
1✔
787
      http2Stream.setProperty(streamKey, stream);
1✔
788
    }
789
  }
1✔
790

791
  /**
792
   * Cancels this stream.
793
   */
794
  private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
795
      ChannelPromise promise) {
796
    NettyClientStream.TransportState stream = cmd.stream();
1✔
797
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
1✔
798
      PerfMark.attachTag(stream.tag());
1✔
799
      PerfMark.linkIn(cmd.getLink());
1✔
800
      Status reason = cmd.reason();
1✔
801
      if (reason != null) {
1✔
802
        stream.transportReportStatus(reason, true, new Metadata());
1✔
803
      }
804
      if (!cmd.stream().isNonExistent()) {
1✔
805
        encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
1✔
806
      } else {
807
        promise.setSuccess();
1✔
808
      }
809
    }
810
  }
1✔
811

812
  /**
813
   * Sends the given GRPC frame for the stream.
814
   */
815
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
816
      ChannelPromise promise) {
817
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
1✔
818
      PerfMark.attachTag(cmd.stream().tag());
1✔
819
      PerfMark.linkIn(cmd.getLink());
1✔
820
      // Call the base class to write the HTTP/2 DATA frame.
821
      // Note: no need to flush since this is handled by the outbound flow controller.
822
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
823
    }
824
  }
1✔
825

826
  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
827
      ChannelPromise promise) {
828
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
1✔
829
      PerfMark.linkIn(msg.getLink());
1✔
830
      sendPingFrameTraced(ctx, msg, promise);
1✔
831
    }
832
  }
1✔
833

834
  /**
835
   * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
836
   * registered to be called when the existing operation completes, and no new frame is sent.
837
   */
838
  private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
839
      ChannelPromise promise) {
840
    // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
841
    // but before termination. After termination, messages will no longer arrive because the
842
    // pipeline clears all handlers on channel close.
843

844
    PingCallback callback = msg.callback();
1✔
845
    Executor executor = msg.executor();
1✔
846
    // we only allow one outstanding ping at a time, so just add the callback to
847
    // any outstanding operation
848
    if (ping != null) {
1✔
849
      promise.setSuccess();
1✔
850
      ping.addCallback(callback, executor);
1✔
851
      return;
1✔
852
    }
853

854
    // Use a new promise to prevent calling the callback twice on write failure: here and in
855
    // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if
856
    // ping != null above.
857
    promise.setSuccess();
1✔
858
    promise = ctx().newPromise();
1✔
859
    // set outstanding operation
860
    long data = USER_PING_PAYLOAD;
1✔
861
    Stopwatch stopwatch = stopwatchFactory.get();
1✔
862
    stopwatch.start();
1✔
863
    ping = new Http2Ping(data, stopwatch);
1✔
864
    ping.addCallback(callback, executor);
1✔
865
    // and then write the ping
866
    encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise);
1✔
867
    ctx.flush();
1✔
868
    final Http2Ping finalPing = ping;
1✔
869
    promise.addListener(new ChannelFutureListener() {
1✔
870
      @Override
871
      public void operationComplete(ChannelFuture future) throws Exception {
872
        if (future.isSuccess()) {
1✔
873
          transportTracer.reportKeepAliveSent();
1✔
874
          return;
1✔
875
        }
876
        Throwable cause = future.cause();
×
877
        Status status = lifecycleManager.getShutdownStatus();
×
878
        if (cause instanceof ClosedChannelException) {
×
879
          if (status == null) {
×
880
            status = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
×
881
                    .withCause(future.cause());
×
882
          }
883
        } else {
884
          status = Utils.statusFromThrowable(cause);
×
885
        }
886
        finalPing.failed(status);
×
887
        if (ping == finalPing) {
×
888
          ping = null;
×
889
        }
890
      }
×
891
    });
892
  }
1✔
893

894
  private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
895
      ChannelPromise promise) throws Exception {
896
    lifecycleManager.notifyShutdown(msg.getStatus());
1✔
897
    // Explicitly flush to create any buffered streams before sending GOAWAY.
898
    // TODO(ejona): determine if the need to flush is a bug in Netty
899
    flush(ctx);
1✔
900
    close(ctx, promise);
1✔
901
  }
1✔
902

903
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
904
      ChannelPromise promise) throws Exception {
905
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
906
      @Override
907
      public boolean visit(Http2Stream stream) throws Http2Exception {
908
        NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
909
        Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
1✔
910
        try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
1✔
911
          PerfMark.linkIn(msg.getLink());
1✔
912
          PerfMark.attachTag(tag);
1✔
913
          if (clientStream != null) {
1✔
914
            clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
1✔
915
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
916
          }
917
          stream.close();
1✔
918
          return true;
1✔
919
        }
920
      }
921
    });
922
    close(ctx, promise);
1✔
923
  }
1✔
924

925
  /**
926
   * Handler for a GOAWAY being received. Fails any streams created after the
927
   * last known stream. May only be called during a read.
928
   */
929
  private void goingAway(long errorCode, byte[] debugData) {
930
    Status finalStatus = statusFromH2Error(
1✔
931
        Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
932
    lifecycleManager.notifyGracefulShutdown(finalStatus);
1✔
933
    abruptGoAwayStatus = statusFromH2Error(
1✔
934
        Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
935
    // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
936
    // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
937
    // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
938
    // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
939
    // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
940
    // UNAVAILABLE. https://github.com/netty/netty/issues/10670
941
    final Status abruptGoAwayStatusConservative = statusFromH2Error(
1✔
942
        null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
943
    final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code();
1✔
944
    // Try to allocate as many in-flight streams as possible, to reduce race window of
945
    // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
946
    // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
947
    // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
948
    // processed and thus this processing must be in-line before processing additional reads.
949

950
    // This can cause reentrancy, but should be minor since it is normal to handle writes in
951
    // response to a read. Also, the call stack is rather shallow at this point
952
    clientWriteQueue.drainNow();
1✔
953
    if (lifecycleManager.notifyShutdown(finalStatus)) {
1✔
954
      // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
955
      // RPCs were not observed by the remote and so should be UNAVAILABLE.
956
      channelInactiveReason = statusFromH2Error(
1✔
957
          null, "Connection closed after GOAWAY", errorCode, debugData);
958
    }
959

960
    final int lastKnownStream = connection().local().lastStreamKnownByPeer();
1✔
961
    try {
962
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
963
        @Override
964
        public boolean visit(Http2Stream stream) throws Http2Exception {
965
          if (stream.id() > lastKnownStream) {
1✔
966
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
967
            if (clientStream != null) {
1✔
968
              // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
969
              // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
970
              // retries, but only if something else caused a connection failure.
971
              RpcProgress progress = mayBeHittingNettyBug
1✔
972
                  ? RpcProgress.PROCESSED
1✔
973
                  : RpcProgress.REFUSED;
1✔
974
              clientStream.transportReportStatus(
1✔
975
                  abruptGoAwayStatusConservative, progress, false, new Metadata());
976
            }
977
            stream.close();
1✔
978
          }
979
          return true;
1✔
980
        }
981
      });
982
    } catch (Http2Exception e) {
×
983
      throw new RuntimeException(e);
×
984
    }
1✔
985
  }
1✔
986

987
  private void cancelPing(Status s) {
988
    if (ping != null) {
1✔
989
      ping.failed(s);
1✔
990
      ping = null;
1✔
991
    }
992
  }
1✔
993

994
  /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
995
  private Status statusFromH2Error(
996
      Status.Code statusCode, String context, long errorCode, byte[] debugData) {
997
    Status status = GrpcUtil.Http2Error.statusForCode(errorCode);
1✔
998
    if (statusCode == null) {
1✔
999
      statusCode = status.getCode();
1✔
1000
    }
1001
    String debugString = "";
1✔
1002
    if (debugData != null && debugData.length > 0) {
1✔
1003
      // If a debug message was provided, use it.
1004
      debugString = ", debug data: " + new String(debugData, UTF_8);
1✔
1005
    }
1006
    return statusCode.toStatus()
1✔
1007
        .withDescription(context + ". " + status.getDescription() + debugString);
1✔
1008
  }
1009

1010
  /**
1011
   * Gets the client stream associated to the given HTTP/2 stream object.
1012
   */
1013
  private NettyClientStream.TransportState clientStream(Http2Stream stream) {
1014
    return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey);
1✔
1015
  }
1016

1017
  private int incrementAndGetNextStreamId() throws StatusException {
1018
    int nextStreamId = connection().local().incrementAndGetNextStreamId();
1✔
1019
    if (nextStreamId < 0) {
1✔
1020
      logger.fine("Stream IDs have been exhausted for this connection. "
1✔
1021
              + "Initiating graceful shutdown of the connection.");
1022
      throw EXHAUSTED_STREAMS_STATUS.asException();
1✔
1023
    }
1024
    return nextStreamId;
1✔
1025
  }
1026

1027
  private Http2Stream requireHttp2Stream(int streamId) {
1028
    Http2Stream stream = connection().stream(streamId);
1✔
1029
    if (stream == null) {
1✔
1030
      // This should never happen.
1031
      throw new AssertionError("Stream does not exist: " + streamId);
×
1032
    }
1033
    return stream;
1✔
1034
  }
1035

1036
  private class FrameListener extends Http2FrameAdapter {
1✔
1037
    private boolean firstSettings = true;
1✔
1038

1039
    @Override
1040
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
1041
      if (firstSettings) {
1✔
1042
        firstSettings = false;
1✔
1043
        attributes = lifecycleManager.filterAttributes(attributes);
1✔
1044
        lifecycleManager.notifyReady();
1✔
1045
      }
1046
    }
1✔
1047

1048
    @Override
1049
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
1050
        boolean endOfStream) throws Http2Exception {
1051
      NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
1052
      return padding;
1✔
1053
    }
1054

1055
    @Override
1056
    public void onHeadersRead(ChannelHandlerContext ctx,
1057
        int streamId,
1058
        Http2Headers headers,
1059
        int streamDependency,
1060
        short weight,
1061
        boolean exclusive,
1062
        int padding,
1063
        boolean endStream) throws Http2Exception {
1064
      NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
1✔
1065
    }
1✔
1066

1067
    @Override
1068
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
1069
        throws Http2Exception {
1070
      NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
1✔
1071
    }
1✔
1072

1073
    @Override
1074
    public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception {
1075
      Http2Ping p = ping;
1✔
1076
      if (ackPayload == flowControlPing().payload()) {
1✔
1077
        flowControlPing().updateWindow();
1✔
1078
        logger.log(Level.FINE, "Window: {0}",
1✔
1079
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1080
      } else if (p != null) {
1✔
1081
        if (p.payload() == ackPayload) {
1✔
1082
          p.complete();
1✔
1083
          ping = null;
1✔
1084
        } else {
1085
          logger.log(Level.WARNING,
1✔
1086
              "Received unexpected ping ack. Expecting {0}, got {1}",
1087
              new Object[] {p.payload(), ackPayload});
1✔
1088
        }
1089
      } else {
1090
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1091
      }
1092
      if (keepAliveManager != null) {
1✔
1093
        keepAliveManager.onDataReceived();
1✔
1094
      }
1095
    }
1✔
1096

1097
    @Override
1098
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
1099
      if (keepAliveManager != null) {
1✔
1100
        keepAliveManager.onDataReceived();
×
1101
      }
1102
    }
1✔
1103
  }
1104

1105
  private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
1106
      implements AbstractNettyHandler.PingLimiter {
1107
    private int pingCount;
1108

1109
    public PingCountingFrameWriter(Http2FrameWriter delegate) {
1110
      super(delegate);
1✔
1111
    }
1✔
1112

1113
    @Override
1114
    public boolean isPingAllowed() {
1115
      // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
1116
      return pingCount < 2;
1✔
1117
    }
1118

1119
    @Override
1120
    public ChannelFuture writeHeaders(
1121
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1122
        int padding, boolean endStream, ChannelPromise promise) {
1123
      pingCount = 0;
×
1124
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
×
1125
    }
1126

1127
    @Override
1128
    public ChannelFuture writeHeaders(
1129
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1130
        int streamDependency, short weight, boolean exclusive,
1131
        int padding, boolean endStream, ChannelPromise promise) {
1132
      pingCount = 0;
1✔
1133
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
1✔
1134
          padding, endStream, promise);
1135
    }
1136

1137
    @Override
1138
    public ChannelFuture writeWindowUpdate(
1139
        ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
1140
      pingCount = 0;
1✔
1141
      return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
1✔
1142
    }
1143

1144
    @Override
1145
    public ChannelFuture writePing(
1146
        ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
1147
      if (!ack) {
1✔
1148
        pingCount++;
1✔
1149
      }
1150
      return super.writePing(ctx, ack, data, promise);
1✔
1151
    }
1152

1153
    @Override
1154
    public ChannelFuture writeData(
1155
        ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
1156
        ChannelPromise promise) {
1157
      if (data.isReadable()) {
1✔
1158
        pingCount = 0;
1✔
1159
      }
1160
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1161
    }
1162
  }
1163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc