• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19741

19 Mar 2025 03:57AM UTC coverage: 88.599% (+0.01%) from 88.588%
#19741

push

github

web-flow
refactor: Stops exception allocation on channel shutdown

This fixes #11955.

Stops exception allocation and its propagation on channel shutdown.

34588 of 39039 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.48
/../netty/src/main/java/io/grpc/netty/NettyClientHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
20
import static io.netty.util.CharsetUtil.UTF_8;
21
import static io.netty.util.internal.ObjectUtil.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Stopwatch;
26
import com.google.common.base.Supplier;
27
import com.google.common.base.Ticker;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.InternalChannelz;
31
import io.grpc.InternalStatus;
32
import io.grpc.Metadata;
33
import io.grpc.Status;
34
import io.grpc.StatusException;
35
import io.grpc.internal.ClientStreamListener.RpcProgress;
36
import io.grpc.internal.ClientTransport.PingCallback;
37
import io.grpc.internal.GrpcAttributes;
38
import io.grpc.internal.GrpcUtil;
39
import io.grpc.internal.Http2Ping;
40
import io.grpc.internal.InUseStateAggregator;
41
import io.grpc.internal.KeepAliveManager;
42
import io.grpc.internal.TransportTracer;
43
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
44
import io.netty.buffer.ByteBuf;
45
import io.netty.buffer.ByteBufUtil;
46
import io.netty.buffer.Unpooled;
47
import io.netty.channel.Channel;
48
import io.netty.channel.ChannelFuture;
49
import io.netty.channel.ChannelFutureListener;
50
import io.netty.channel.ChannelHandlerContext;
51
import io.netty.channel.ChannelPromise;
52
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
53
import io.netty.handler.codec.http2.DefaultHttp2Connection;
54
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
55
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
56
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
57
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
58
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
59
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
60
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
61
import io.netty.handler.codec.http2.Http2CodecUtil;
62
import io.netty.handler.codec.http2.Http2Connection;
63
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
64
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
65
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
66
import io.netty.handler.codec.http2.Http2Error;
67
import io.netty.handler.codec.http2.Http2Exception;
68
import io.netty.handler.codec.http2.Http2FrameAdapter;
69
import io.netty.handler.codec.http2.Http2FrameLogger;
70
import io.netty.handler.codec.http2.Http2FrameReader;
71
import io.netty.handler.codec.http2.Http2FrameWriter;
72
import io.netty.handler.codec.http2.Http2Headers;
73
import io.netty.handler.codec.http2.Http2HeadersDecoder;
74
import io.netty.handler.codec.http2.Http2HeadersEncoder;
75
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
76
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
77
import io.netty.handler.codec.http2.Http2Settings;
78
import io.netty.handler.codec.http2.Http2Stream;
79
import io.netty.handler.codec.http2.Http2StreamVisitor;
80
import io.netty.handler.codec.http2.StreamBufferingEncoder;
81
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
82
import io.netty.handler.logging.LogLevel;
83
import io.perfmark.PerfMark;
84
import io.perfmark.Tag;
85
import io.perfmark.TaskCloseable;
86
import java.nio.channels.ClosedChannelException;
87
import java.util.LinkedHashMap;
88
import java.util.Map;
89
import java.util.concurrent.Executor;
90
import java.util.logging.Level;
91
import java.util.logging.Logger;
92
import javax.annotation.Nullable;
93

94
/**
95
 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
96
 * the context of the Netty Channel thread.
97
 */
98
class NettyClientHandler extends AbstractNettyHandler {
99
  private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
1✔
100
  static boolean enablePerRpcAuthorityCheck =
1✔
101
      GrpcUtil.getFlag("GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK", false);
1✔
102

103
  /**
104
   * A message that simply passes through the channel without any real processing. It is useful to
105
   * check if buffers have been drained and test the health of the channel in a single operation.
106
   */
107
  static final Object NOOP_MESSAGE = new Object();
1✔
108

109
  /**
110
   * Status used when the transport has exhausted the number of streams.
111
   */
112
  private static final Status EXHAUSTED_STREAMS_STATUS =
1✔
113
          Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");
1✔
114
  private static final long USER_PING_PAYLOAD = 1111;
115

116
  private final Http2Connection.PropertyKey streamKey;
117
  private final ClientTransportLifecycleManager lifecycleManager;
118
  private final KeepAliveManager keepAliveManager;
119
  // Returns new unstarted stopwatches
120
  private final Supplier<Stopwatch> stopwatchFactory;
121
  private final TransportTracer transportTracer;
122
  private final Attributes eagAttributes;
123
  private final String authority;
124
  private final InUseStateAggregator<Http2Stream> inUseState =
1✔
125
      new InUseStateAggregator<Http2Stream>() {
1✔
126
        @Override
127
        protected void handleInUse() {
128
          lifecycleManager.notifyInUse(true);
1✔
129
        }
1✔
130

131
        @Override
132
        protected void handleNotInUse() {
133
          lifecycleManager.notifyInUse(false);
1✔
134
        }
1✔
135
      };
136
  private final Map<String, Status> peerVerificationResults =
1✔
137
      new LinkedHashMap<String, Status>() {
1✔
138
        @Override
139
        protected boolean removeEldestEntry(Map.Entry<String, Status> eldest) {
140
          return size() > 100;
1✔
141
        }
142
      };
143

144
  private WriteQueue clientWriteQueue;
145
  private Http2Ping ping;
146
  private Attributes attributes;
147
  private InternalChannelz.Security securityInfo;
148
  private Status abruptGoAwayStatus;
149
  private Status channelInactiveReason;
150

151
  static NettyClientHandler newHandler(
152
      ClientTransportLifecycleManager lifecycleManager,
153
      @Nullable KeepAliveManager keepAliveManager,
154
      boolean autoFlowControl,
155
      int flowControlWindow,
156
      int maxHeaderListSize,
157
      int softLimitHeaderListSize,
158
      Supplier<Stopwatch> stopwatchFactory,
159
      Runnable tooManyPingsRunnable,
160
      TransportTracer transportTracer,
161
      Attributes eagAttributes,
162
      String authority,
163
      ChannelLogger negotiationLogger,
164
      Ticker ticker) {
165
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
166
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
1✔
167
    Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
1✔
168
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
169
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
170
    Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(encoder);
1✔
171
    Http2Connection connection = new DefaultHttp2Connection(false);
1✔
172
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
173
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
174
    DefaultHttp2RemoteFlowController controller =
1✔
175
        new DefaultHttp2RemoteFlowController(connection, dist);
176
    connection.remote().flowController(controller);
1✔
177

178
    return newHandler(
1✔
179
        connection,
180
        frameReader,
181
        frameWriter,
182
        lifecycleManager,
183
        keepAliveManager,
184
        autoFlowControl,
185
        flowControlWindow,
186
        maxHeaderListSize,
187
        softLimitHeaderListSize,
188
        stopwatchFactory,
189
        tooManyPingsRunnable,
190
        transportTracer,
191
        eagAttributes,
192
        authority,
193
        negotiationLogger,
194
        ticker);
195
  }
196

197
  @VisibleForTesting
198
  static NettyClientHandler newHandler(
199
      final Http2Connection connection,
200
      Http2FrameReader frameReader,
201
      Http2FrameWriter frameWriter,
202
      ClientTransportLifecycleManager lifecycleManager,
203
      KeepAliveManager keepAliveManager,
204
      boolean autoFlowControl,
205
      int flowControlWindow,
206
      int maxHeaderListSize,
207
      int softLimitHeaderListSize,
208
      Supplier<Stopwatch> stopwatchFactory,
209
      Runnable tooManyPingsRunnable,
210
      TransportTracer transportTracer,
211
      Attributes eagAttributes,
212
      String authority,
213
      ChannelLogger negotiationLogger,
214
      Ticker ticker) {
215
    Preconditions.checkNotNull(connection, "connection");
1✔
216
    Preconditions.checkNotNull(frameReader, "frameReader");
1✔
217
    Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
1✔
218
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
1✔
219
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
220
    Preconditions.checkArgument(softLimitHeaderListSize > 0,
1✔
221
        "softLimitHeaderListSize must be positive");
222
    Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
223
    Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
224
    Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
225
    Preconditions.checkNotNull(authority, "authority");
1✔
226

227
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
1✔
228
    frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
1✔
229
    frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
1✔
230

231
    PingCountingFrameWriter pingCounter;
232
    frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
1✔
233

234
    StreamBufferingEncoder encoder =
1✔
235
        new StreamBufferingEncoder(
236
            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
237

238
    // Create the local flow controller configured to auto-refill the connection window.
239
    connection.local().flowController(
1✔
240
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
241

242
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
243
        frameReader);
244

245
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection));
1✔
246

247
    Http2Settings settings = new Http2Settings();
1✔
248
    settings.pushEnabled(false);
1✔
249
    settings.initialWindowSize(flowControlWindow);
1✔
250
    settings.maxConcurrentStreams(0);
1✔
251
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
252

253
    return new NettyClientHandler(
1✔
254
        decoder,
255
        encoder,
256
        settings,
257
        negotiationLogger,
258
        lifecycleManager,
259
        keepAliveManager,
260
        stopwatchFactory,
261
        tooManyPingsRunnable,
262
        transportTracer,
263
        eagAttributes,
264
        authority,
265
        autoFlowControl,
266
        pingCounter,
267
        ticker,
268
        maxHeaderListSize,
269
        softLimitHeaderListSize);
270
  }
271

272
  private NettyClientHandler(
273
      Http2ConnectionDecoder decoder,
274
      Http2ConnectionEncoder encoder,
275
      Http2Settings settings,
276
      ChannelLogger negotiationLogger,
277
      ClientTransportLifecycleManager lifecycleManager,
278
      KeepAliveManager keepAliveManager,
279
      Supplier<Stopwatch> stopwatchFactory,
280
      final Runnable tooManyPingsRunnable,
281
      TransportTracer transportTracer,
282
      Attributes eagAttributes,
283
      String authority,
284
      boolean autoFlowControl,
285
      PingLimiter pingLimiter,
286
      Ticker ticker,
287
      int maxHeaderListSize,
288
      int softLimitHeaderListSize) {
289
    super(
1✔
290
        /* channelUnused= */ null,
291
        decoder,
292
        encoder,
293
        settings,
294
        negotiationLogger,
295
        autoFlowControl,
296
        pingLimiter,
297
        ticker,
298
        maxHeaderListSize,
299
        softLimitHeaderListSize);
300
    this.lifecycleManager = lifecycleManager;
1✔
301
    this.keepAliveManager = keepAliveManager;
1✔
302
    this.stopwatchFactory = stopwatchFactory;
1✔
303
    this.transportTracer = Preconditions.checkNotNull(transportTracer);
1✔
304
    this.eagAttributes = eagAttributes;
1✔
305
    this.authority = authority;
1✔
306
    this.attributes = Attributes.newBuilder()
1✔
307
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
1✔
308

309
    // Set the frame listener on the decoder.
310
    decoder().frameListener(new FrameListener());
1✔
311

312
    Http2Connection connection = encoder.connection();
1✔
313
    streamKey = connection.newKey();
1✔
314

315
    connection.addListener(new Http2ConnectionAdapter() {
1✔
316
      @Override
317
      public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
318
        byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
1✔
319
        goingAway(errorCode, debugDataBytes);
1✔
320
        if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
1✔
321
          String data = new String(debugDataBytes, UTF_8);
1✔
322
          logger.log(
1✔
323
              Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data);
324
          if ("too_many_pings".equals(data)) {
1✔
325
            tooManyPingsRunnable.run();
1✔
326
          }
327
        }
328
      }
1✔
329

330
      @Override
331
      public void onStreamActive(Http2Stream stream) {
332
        if (connection().numActiveStreams() == 1
1✔
333
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
334
          NettyClientHandler.this.keepAliveManager.onTransportActive();
1✔
335
        }
336
      }
1✔
337

338
      @Override
339
      public void onStreamClosed(Http2Stream stream) {
340
        // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
341
        // the first place, we don't propagate that option here, and it's safe to reset the in-use
342
        // state for them, which will be a cheap no-op.
343
        inUseState.updateObjectInUse(stream, false);
1✔
344
        if (connection().numActiveStreams() == 0
1✔
345
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
346
          NettyClientHandler.this.keepAliveManager.onTransportIdle();
1✔
347
        }
348
      }
1✔
349
    });
350
  }
1✔
351

352
  /**
353
   * The protocol negotiation attributes, available once the protocol negotiation completes;
354
   * otherwise returns {@code Attributes.EMPTY}.
355
   */
356
  Attributes getAttributes() {
357
    return attributes;
1✔
358
  }
359

360
  /**
361
   * Handler for commands sent from the stream.
362
   */
363
  @Override
364
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
365
          throws Exception {
366
    if (msg instanceof CreateStreamCommand) {
1✔
367
      createStream((CreateStreamCommand) msg, promise);
1✔
368
    } else if (msg instanceof SendGrpcFrameCommand) {
1✔
369
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
370
    } else if (msg instanceof CancelClientStreamCommand) {
1✔
371
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
1✔
372
    } else if (msg instanceof SendPingCommand) {
1✔
373
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
1✔
374
    } else if (msg instanceof GracefulCloseCommand) {
1✔
375
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
1✔
376
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
377
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
378
    } else if (msg == NOOP_MESSAGE) {
1✔
379
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
1✔
380
    } else {
381
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
382
    }
383
  }
1✔
384

385
  void startWriteQueue(Channel channel) {
386
    clientWriteQueue = new WriteQueue(channel);
1✔
387
  }
1✔
388

389
  WriteQueue getWriteQueue() {
390
    return clientWriteQueue;
1✔
391
  }
392

393
  ClientTransportLifecycleManager getLifecycleManager() {
394
    return lifecycleManager;
1✔
395
  }
396

397
  /**
398
   * Returns the given processed bytes back to inbound flow control.
399
   */
400
  void returnProcessedBytes(Http2Stream stream, int bytes) {
401
    try {
402
      decoder().flowController().consumeBytes(stream, bytes);
1✔
403
    } catch (Http2Exception e) {
×
404
      throw new RuntimeException(e);
×
405
    }
1✔
406
  }
1✔
407

408
  private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
409
    // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
410
    if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
1✔
411
      NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
412
      PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
1✔
413
      // check metadata size vs soft limit
414
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
415
      boolean shouldFail =
1✔
416
          Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
417
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize);
418
      if (shouldFail && endStream) {
1✔
419
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
×
420
            .withDescription(
×
421
                String.format(
×
422
                    "Server Status + Trailers of size %d exceeded Metadata size soft limit: %d",
423
                    h2HeadersSize,
×
424
                    softLimitHeaderListSize)), true, new Metadata());
×
425
        return;
×
426
      } else if (shouldFail) {
1✔
427
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
1✔
428
            .withDescription(
1✔
429
                String.format(
1✔
430
                    "Server Headers of size %d exceeded Metadata size soft limit: %d",
431
                    h2HeadersSize,
1✔
432
                    softLimitHeaderListSize)), true, new Metadata());
1✔
433
        return;
1✔
434
      }
435
      stream.transportHeadersReceived(headers, endStream);
1✔
436
    }
437

438
    if (keepAliveManager != null) {
1✔
439
      keepAliveManager.onDataReceived();
1✔
440
    }
441
  }
1✔
442

443
  /**
444
   * Handler for an inbound HTTP/2 DATA frame.
445
   */
446
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
447
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
448
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
449
    PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
1✔
450
    stream.transportDataReceived(data, endOfStream);
1✔
451
    if (keepAliveManager != null) {
1✔
452
      keepAliveManager.onDataReceived();
1✔
453
    }
454
  }
1✔
455

456
  /**
457
   * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
458
   */
459
  private void onRstStreamRead(int streamId, long errorCode) {
460
    NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
1✔
461
    if (stream != null) {
1✔
462
      PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
1✔
463
      Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
1✔
464
      stream.transportReportStatus(
1✔
465
          status,
466
          errorCode == Http2Error.REFUSED_STREAM.code()
1✔
467
              ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
468
          false /*stop delivery*/,
469
          new Metadata());
470
      if (keepAliveManager != null) {
1✔
471
        keepAliveManager.onDataReceived();
1✔
472
      }
473
    }
474
  }
1✔
475

476
  @Override
477
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
478
    logger.fine("Network channel being closed by the application.");
1✔
479
    if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
1✔
480
      lifecycleManager.notifyShutdown(
1✔
481
          Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"));
1✔
482
    }
483
    super.close(ctx, promise);
1✔
484
  }
1✔
485

486
  /**
487
   * Handler for the Channel shutting down.
488
   */
489
  @Override
490
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
491
    try {
492
      logger.fine("Network channel is closed");
1✔
493
      Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
1✔
494
      lifecycleManager.notifyShutdown(status);
1✔
495
      final Status streamStatus;
496
      if (channelInactiveReason != null) {
1✔
497
        streamStatus = channelInactiveReason;
1✔
498
      } else {
499
        streamStatus = lifecycleManager.getShutdownStatus();
1✔
500
      }
501
      try {
502
        cancelPing(lifecycleManager.getShutdownStatus());
1✔
503
        // Report status to the application layer for any open streams
504
        connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
505
          @Override
506
          public boolean visit(Http2Stream stream) throws Http2Exception {
507
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
508
            if (clientStream != null) {
1✔
509
              clientStream.transportReportStatus(streamStatus, false, new Metadata());
1✔
510
            }
511
            return true;
1✔
512
          }
513
        });
514
      } finally {
515
        lifecycleManager.notifyTerminated(status);
1✔
516
      }
517
    } finally {
518
      // Close any open streams
519
      super.channelInactive(ctx);
1✔
520
      if (keepAliveManager != null) {
1✔
521
        keepAliveManager.onTransportTermination();
1✔
522
      }
523
    }
524
  }
1✔
525

526
  @Override
527
  public void handleProtocolNegotiationCompleted(
528
      Attributes attributes, InternalChannelz.Security securityInfo) {
529
    this.attributes = this.attributes.toBuilder().setAll(attributes).build();
1✔
530
    this.securityInfo = securityInfo;
1✔
531
    super.handleProtocolNegotiationCompleted(attributes, securityInfo);
1✔
532
    writeBufferingAndRemove(ctx().channel());
1✔
533
  }
1✔
534

535
  static void writeBufferingAndRemove(Channel channel) {
536
    checkNotNull(channel, "channel");
1✔
537
    ChannelHandlerContext handlerCtx =
1✔
538
        channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
1✔
539
    if (handlerCtx == null) {
1✔
540
      return;
1✔
541
    }
542
    ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
1✔
543
  }
1✔
544

545
  @Override
546
  public Attributes getEagAttributes() {
547
    return eagAttributes;
1✔
548
  }
549

550
  @Override
551
  public String getAuthority() {
552
    return authority;
1✔
553
  }
554

555
  InternalChannelz.Security getSecurityInfo() {
556
    return securityInfo;
1✔
557
  }
558

559
  @Override
560
  protected void onConnectionError(ChannelHandlerContext ctx,  boolean outbound, Throwable cause,
561
      Http2Exception http2Ex) {
562
    logger.log(Level.FINE, "Caught a connection error", cause);
1✔
563
    lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
1✔
564
    // Parent class will shut down the Channel
565
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
566
  }
1✔
567

568
  @Override
569
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
570
      Http2Exception.StreamException http2Ex) {
571
    // Close the stream with a status that contains the cause.
572
    NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId()));
1✔
573
    if (stream != null) {
1✔
574
      stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata());
1✔
575
    } else {
576
      logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause);
1✔
577
    }
578

579
    // Delegate to the base class to send a RST_STREAM.
580
    super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
581
  }
1✔
582

583
  @Override
584
  protected boolean isGracefulShutdownComplete() {
585
    // Only allow graceful shutdown to complete after all pending streams have completed.
586
    return super.isGracefulShutdownComplete()
1✔
587
        && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
1✔
588
  }
589

590
  /**
591
   * Attempts to create a new stream from the given command. If there are too many active streams,
592
   * the creation request is queued.
593
   */
594
  private void createStream(CreateStreamCommand command, ChannelPromise promise)
595
          throws Exception {
596
    if (lifecycleManager.getShutdownStatus() != null) {
1✔
597
      command.stream().setNonExistent();
1✔
598
      // The connection is going away (it is really the GOAWAY case),
599
      // just terminate the stream now.
600
      command.stream().transportReportStatus(
1✔
601
          lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
1✔
602
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
603
              lifecycleManager.getShutdownStatus(), null));
1✔
604
      return;
1✔
605
    }
606

607
    CharSequence authorityHeader = command.headers().authority();
1✔
608
    if (authorityHeader == null) {
1✔
609
      Status authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
610
              "Missing authority header");
611
      command.stream().setNonExistent();
1✔
612
      command.stream().transportReportStatus(
1✔
613
              Status.UNAVAILABLE, RpcProgress.PROCESSED, true, new Metadata());
614
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
615
              authorityVerificationStatus, null));
616
      return;
1✔
617
    }
618
    // No need to verify authority for the rpc outgoing header if it is same as the authority
619
    // for the transport
620
    if (!authority.contentEquals(authorityHeader)) {
1✔
621
      Status authorityVerificationStatus = peerVerificationResults.get(
1✔
622
              authorityHeader.toString());
1✔
623
      if (authorityVerificationStatus == null) {
1✔
624
        if (attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) == null) {
1✔
625
          authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
626
                  "Authority verifier not found to verify authority");
627
          command.stream().setNonExistent();
1✔
628
          command.stream().transportReportStatus(
1✔
629
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
630
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
631
                  authorityVerificationStatus, null));
632
          return;
1✔
633
        }
634
        authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
1✔
635
                .verifyAuthority(authorityHeader.toString());
1✔
636
        peerVerificationResults.put(authorityHeader.toString(), authorityVerificationStatus);
1✔
637
        if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) {
1✔
638
          logger.log(Level.WARNING, String.format("%s.%s",
1✔
639
                          authorityVerificationStatus.getDescription(),
1✔
640
                          enablePerRpcAuthorityCheck
1✔
641
                                  ? "" : " This will be an error in the future."),
1✔
642
                  InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
643
                          authorityVerificationStatus, null));
644
        }
645
      }
646
      if (!authorityVerificationStatus.isOk()) {
1✔
647
        if (enablePerRpcAuthorityCheck) {
1✔
648
          command.stream().setNonExistent();
1✔
649
          command.stream().transportReportStatus(
1✔
650
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
651
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
652
                  authorityVerificationStatus, null));
653
          return;
1✔
654
        }
655
      }
656
    }
657
    // Get the stream ID for the new stream.
658
    int streamId;
659
    try {
660
      streamId = incrementAndGetNextStreamId();
1✔
661
    } catch (StatusException e) {
1✔
662
      command.stream().setNonExistent();
1✔
663
      // Stream IDs have been exhausted for this connection. Fail the promise immediately.
664
      promise.setFailure(e);
1✔
665

666
      // Initiate a graceful shutdown if we haven't already.
667
      if (!connection().goAwaySent()) {
1✔
668
        logger.fine("Stream IDs have been exhausted for this connection. "
1✔
669
                + "Initiating graceful shutdown of the connection.");
670
        lifecycleManager.notifyShutdown(e.getStatus());
1✔
671
        close(ctx(), ctx().newPromise());
1✔
672
      }
673
      return;
1✔
674
    }
1✔
675
    if (connection().goAwayReceived()) {
1✔
676
      Status s = abruptGoAwayStatus;
1✔
677
      int maxActiveStreams = connection().local().maxActiveStreams();
1✔
678
      int lastStreamId = connection().local().lastStreamKnownByPeer();
1✔
679
      if (s == null) {
1✔
680
        // Should be impossible, but handle pseudo-gracefully
681
        s = Status.INTERNAL.withDescription(
×
682
            "Failed due to abrupt GOAWAY, but can't find GOAWAY details");
683
      } else if (streamId > lastStreamId) {
1✔
684
        s = s.augmentDescription(
1✔
685
            "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId);
686
      } else if (connection().local().numActiveStreams() == maxActiveStreams) {
1✔
687
        s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams);
1✔
688
      }
689
      if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) {
1✔
690
        // This should only be reachable during onGoAwayReceived, as otherwise
691
        // getShutdownThrowable() != null
692
        command.stream().setNonExistent();
1✔
693
        command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
1✔
694
        promise.setFailure(s.asRuntimeException());
1✔
695
        return;
1✔
696
      }
697
    }
698

699
    NettyClientStream.TransportState stream = command.stream();
1✔
700
    Http2Headers headers = command.headers();
1✔
701
    stream.setId(streamId);
1✔
702

703
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
1✔
704
      PerfMark.linkIn(command.getLink());
1✔
705
      PerfMark.attachTag(stream.tag());
1✔
706
      createStreamTraced(
1✔
707
          streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
1✔
708
    }
709
  }
1✔
710

711
  private void createStreamTraced(
712
      final int streamId,
713
      final NettyClientStream.TransportState stream,
714
      final Http2Headers headers,
715
      boolean isGet,
716
      final boolean shouldBeCountedForInUse,
717
      final ChannelPromise promise) {
718
    // Create an intermediate promise so that we can intercept the failure reported back to the
719
    // application.
720
    ChannelPromise tempPromise = ctx().newPromise();
1✔
721
    encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
1✔
722
        .addListener(new ChannelFutureListener() {
1✔
723
          @Override
724
          public void operationComplete(ChannelFuture future) throws Exception {
725
            if (future.isSuccess()) {
1✔
726
              // The http2Stream will be null in case a stream buffered in the encoder
727
              // was canceled via RST_STREAM.
728
              Http2Stream http2Stream = connection().stream(streamId);
1✔
729
              if (http2Stream != null) {
1✔
730
                stream.getStatsTraceContext().clientOutboundHeaders();
1✔
731
                http2Stream.setProperty(streamKey, stream);
1✔
732

733
                // This delays the in-use state until the I/O completes, which technically may
734
                // be later than we would like.
735
                if (shouldBeCountedForInUse) {
1✔
736
                  inUseState.updateObjectInUse(http2Stream, true);
1✔
737
                }
738

739
                // Attach the client stream to the HTTP/2 stream object as user data.
740
                stream.setHttp2Stream(http2Stream);
1✔
741
              }
742
              // Otherwise, the stream has been cancelled and Netty is sending a
743
              // RST_STREAM frame which causes it to purge pending writes from the
744
              // flow-controller and delete the http2Stream. The stream listener has already
745
              // been notified of cancellation so there is nothing to do.
746

747
              // Just forward on the success status to the original promise.
748
              promise.setSuccess();
1✔
749
            } else {
1✔
750
              Throwable cause = future.cause();
1✔
751
              if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
1✔
752
                StreamBufferingEncoder.Http2GoAwayException e =
1✔
753
                    (StreamBufferingEncoder.Http2GoAwayException) cause;
754
                Status status = statusFromH2Error(
1✔
755
                    Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
756
                    e.errorCode(), e.debugData());
1✔
757
                cause = status.asRuntimeException();
1✔
758
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
759
              } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
1✔
760
                Status status = lifecycleManager.getShutdownStatus();
1✔
761
                if (status == null) {
1✔
762
                  status = Status.UNAVAILABLE.withCause(cause)
×
763
                      .withDescription("Connection closed while stream is buffered");
×
764
                }
765
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
766
              }
767
              promise.setFailure(cause);
1✔
768
            }
769
          }
1✔
770
        });
771
  }
1✔
772

773
  /**
774
   * Cancels this stream.
775
   */
776
  private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
777
      ChannelPromise promise) {
778
    NettyClientStream.TransportState stream = cmd.stream();
1✔
779
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
1✔
780
      PerfMark.attachTag(stream.tag());
1✔
781
      PerfMark.linkIn(cmd.getLink());
1✔
782
      Status reason = cmd.reason();
1✔
783
      if (reason != null) {
1✔
784
        stream.transportReportStatus(reason, true, new Metadata());
1✔
785
      }
786
      if (!cmd.stream().isNonExistent()) {
1✔
787
        encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
1✔
788
      } else {
789
        promise.setSuccess();
1✔
790
      }
791
    }
792
  }
1✔
793

794
  /**
795
   * Sends the given GRPC frame for the stream.
796
   */
797
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
798
      ChannelPromise promise) {
799
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
1✔
800
      PerfMark.attachTag(cmd.stream().tag());
1✔
801
      PerfMark.linkIn(cmd.getLink());
1✔
802
      // Call the base class to write the HTTP/2 DATA frame.
803
      // Note: no need to flush since this is handled by the outbound flow controller.
804
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
805
    }
806
  }
1✔
807

808
  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
809
      ChannelPromise promise) {
810
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
1✔
811
      PerfMark.linkIn(msg.getLink());
1✔
812
      sendPingFrameTraced(ctx, msg, promise);
1✔
813
    }
814
  }
1✔
815

816
  /**
817
   * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
818
   * registered to be called when the existing operation completes, and no new frame is sent.
819
   */
820
  private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
821
      ChannelPromise promise) {
822
    // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
823
    // but before termination. After termination, messages will no longer arrive because the
824
    // pipeline clears all handlers on channel close.
825

826
    PingCallback callback = msg.callback();
1✔
827
    Executor executor = msg.executor();
1✔
828
    // we only allow one outstanding ping at a time, so just add the callback to
829
    // any outstanding operation
830
    if (ping != null) {
1✔
831
      promise.setSuccess();
1✔
832
      ping.addCallback(callback, executor);
1✔
833
      return;
1✔
834
    }
835

836
    // Use a new promise to prevent calling the callback twice on write failure: here and in
837
    // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if
838
    // ping != null above.
839
    promise.setSuccess();
1✔
840
    promise = ctx().newPromise();
1✔
841
    // set outstanding operation
842
    long data = USER_PING_PAYLOAD;
1✔
843
    Stopwatch stopwatch = stopwatchFactory.get();
1✔
844
    stopwatch.start();
1✔
845
    ping = new Http2Ping(data, stopwatch);
1✔
846
    ping.addCallback(callback, executor);
1✔
847
    // and then write the ping
848
    encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise);
1✔
849
    ctx.flush();
1✔
850
    final Http2Ping finalPing = ping;
1✔
851
    promise.addListener(new ChannelFutureListener() {
1✔
852
      @Override
853
      public void operationComplete(ChannelFuture future) throws Exception {
854
        if (future.isSuccess()) {
1✔
855
          transportTracer.reportKeepAliveSent();
1✔
856
          return;
1✔
857
        }
858
        Throwable cause = future.cause();
×
859
        Status status = lifecycleManager.getShutdownStatus();
×
860
        if (cause instanceof ClosedChannelException) {
×
861
          if (status == null) {
×
862
            status = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
×
863
                    .withCause(future.cause());
×
864
          }
865
        } else {
866
          status = Utils.statusFromThrowable(cause);
×
867
        }
868
        finalPing.failed(status);
×
869
        if (ping == finalPing) {
×
870
          ping = null;
×
871
        }
872
      }
×
873
    });
874
  }
1✔
875

876
  private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
877
      ChannelPromise promise) throws Exception {
878
    lifecycleManager.notifyShutdown(msg.getStatus());
1✔
879
    // Explicitly flush to create any buffered streams before sending GOAWAY.
880
    // TODO(ejona): determine if the need to flush is a bug in Netty
881
    flush(ctx);
1✔
882
    close(ctx, promise);
1✔
883
  }
1✔
884

885
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
886
      ChannelPromise promise) throws Exception {
887
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
888
      @Override
889
      public boolean visit(Http2Stream stream) throws Http2Exception {
890
        NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
891
        Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
1✔
892
        try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
1✔
893
          PerfMark.linkIn(msg.getLink());
1✔
894
          PerfMark.attachTag(tag);
1✔
895
          if (clientStream != null) {
1✔
896
            clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
1✔
897
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
898
          }
899
          stream.close();
1✔
900
          return true;
1✔
901
        }
902
      }
903
    });
904
    close(ctx, promise);
1✔
905
  }
1✔
906

907
  /**
908
   * Handler for a GOAWAY being received. Fails any streams created after the
909
   * last known stream. May only be called during a read.
910
   */
911
  private void goingAway(long errorCode, byte[] debugData) {
912
    Status finalStatus = statusFromH2Error(
1✔
913
        Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
914
    lifecycleManager.notifyGracefulShutdown(finalStatus);
1✔
915
    abruptGoAwayStatus = statusFromH2Error(
1✔
916
        Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
917
    // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
918
    // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
919
    // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
920
    // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
921
    // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
922
    // UNAVAILABLE. https://github.com/netty/netty/issues/10670
923
    final Status abruptGoAwayStatusConservative = statusFromH2Error(
1✔
924
        null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
925
    final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code();
1✔
926
    // Try to allocate as many in-flight streams as possible, to reduce race window of
927
    // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
928
    // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
929
    // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
930
    // processed and thus this processing must be in-line before processing additional reads.
931

932
    // This can cause reentrancy, but should be minor since it is normal to handle writes in
933
    // response to a read. Also, the call stack is rather shallow at this point
934
    clientWriteQueue.drainNow();
1✔
935
    if (lifecycleManager.notifyShutdown(finalStatus)) {
1✔
936
      // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
937
      // RPCs were not observed by the remote and so should be UNAVAILABLE.
938
      channelInactiveReason = statusFromH2Error(
1✔
939
          null, "Connection closed after GOAWAY", errorCode, debugData);
940
    }
941

942
    final int lastKnownStream = connection().local().lastStreamKnownByPeer();
1✔
943
    try {
944
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
945
        @Override
946
        public boolean visit(Http2Stream stream) throws Http2Exception {
947
          if (stream.id() > lastKnownStream) {
1✔
948
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
949
            if (clientStream != null) {
1✔
950
              // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
951
              // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
952
              // retries, but only if something else caused a connection failure.
953
              RpcProgress progress = mayBeHittingNettyBug
1✔
954
                  ? RpcProgress.PROCESSED
1✔
955
                  : RpcProgress.REFUSED;
1✔
956
              clientStream.transportReportStatus(
1✔
957
                  abruptGoAwayStatusConservative, progress, false, new Metadata());
958
            }
959
            stream.close();
1✔
960
          }
961
          return true;
1✔
962
        }
963
      });
964
    } catch (Http2Exception e) {
×
965
      throw new RuntimeException(e);
×
966
    }
1✔
967
  }
1✔
968

969
  private void cancelPing(Status s) {
970
    if (ping != null) {
1✔
971
      ping.failed(s);
1✔
972
      ping = null;
1✔
973
    }
974
  }
1✔
975

976
  /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
977
  private Status statusFromH2Error(
978
      Status.Code statusCode, String context, long errorCode, byte[] debugData) {
979
    Status status = GrpcUtil.Http2Error.statusForCode(errorCode);
1✔
980
    if (statusCode == null) {
1✔
981
      statusCode = status.getCode();
1✔
982
    }
983
    String debugString = "";
1✔
984
    if (debugData != null && debugData.length > 0) {
1✔
985
      // If a debug message was provided, use it.
986
      debugString = ", debug data: " + new String(debugData, UTF_8);
1✔
987
    }
988
    return statusCode.toStatus()
1✔
989
        .withDescription(context + ". " + status.getDescription() + debugString);
1✔
990
  }
991

992
  /**
993
   * Gets the client stream associated to the given HTTP/2 stream object.
994
   */
995
  private NettyClientStream.TransportState clientStream(Http2Stream stream) {
996
    return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey);
1✔
997
  }
998

999
  private int incrementAndGetNextStreamId() throws StatusException {
1000
    int nextStreamId = connection().local().incrementAndGetNextStreamId();
1✔
1001
    if (nextStreamId < 0) {
1✔
1002
      logger.fine("Stream IDs have been exhausted for this connection. "
1✔
1003
              + "Initiating graceful shutdown of the connection.");
1004
      throw EXHAUSTED_STREAMS_STATUS.asException();
1✔
1005
    }
1006
    return nextStreamId;
1✔
1007
  }
1008

1009
  private Http2Stream requireHttp2Stream(int streamId) {
1010
    Http2Stream stream = connection().stream(streamId);
1✔
1011
    if (stream == null) {
1✔
1012
      // This should never happen.
1013
      throw new AssertionError("Stream does not exist: " + streamId);
×
1014
    }
1015
    return stream;
1✔
1016
  }
1017

1018
  private class FrameListener extends Http2FrameAdapter {
1✔
1019
    private boolean firstSettings = true;
1✔
1020

1021
    @Override
1022
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
1023
      if (firstSettings) {
1✔
1024
        firstSettings = false;
1✔
1025
        attributes = lifecycleManager.filterAttributes(attributes);
1✔
1026
        lifecycleManager.notifyReady();
1✔
1027
      }
1028
    }
1✔
1029

1030
    @Override
1031
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
1032
        boolean endOfStream) throws Http2Exception {
1033
      NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
1034
      return padding;
1✔
1035
    }
1036

1037
    @Override
1038
    public void onHeadersRead(ChannelHandlerContext ctx,
1039
        int streamId,
1040
        Http2Headers headers,
1041
        int streamDependency,
1042
        short weight,
1043
        boolean exclusive,
1044
        int padding,
1045
        boolean endStream) throws Http2Exception {
1046
      NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
1✔
1047
    }
1✔
1048

1049
    @Override
1050
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
1051
        throws Http2Exception {
1052
      NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
1✔
1053
    }
1✔
1054

1055
    @Override
1056
    public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception {
1057
      Http2Ping p = ping;
1✔
1058
      if (ackPayload == flowControlPing().payload()) {
1✔
1059
        flowControlPing().updateWindow();
1✔
1060
        logger.log(Level.FINE, "Window: {0}",
1✔
1061
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1062
      } else if (p != null) {
1✔
1063
        if (p.payload() == ackPayload) {
1✔
1064
          p.complete();
1✔
1065
          ping = null;
1✔
1066
        } else {
1067
          logger.log(Level.WARNING,
1✔
1068
              "Received unexpected ping ack. Expecting {0}, got {1}",
1069
              new Object[] {p.payload(), ackPayload});
1✔
1070
        }
1071
      } else {
1072
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1073
      }
1074
      if (keepAliveManager != null) {
1✔
1075
        keepAliveManager.onDataReceived();
1✔
1076
      }
1077
    }
1✔
1078

1079
    @Override
1080
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
1081
      if (keepAliveManager != null) {
1✔
1082
        keepAliveManager.onDataReceived();
×
1083
      }
1084
    }
1✔
1085
  }
1086

1087
  private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
1088
      implements AbstractNettyHandler.PingLimiter {
1089
    private int pingCount;
1090

1091
    public PingCountingFrameWriter(Http2FrameWriter delegate) {
1092
      super(delegate);
1✔
1093
    }
1✔
1094

1095
    @Override
1096
    public boolean isPingAllowed() {
1097
      // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
1098
      return pingCount < 2;
1✔
1099
    }
1100

1101
    @Override
1102
    public ChannelFuture writeHeaders(
1103
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1104
        int padding, boolean endStream, ChannelPromise promise) {
1105
      pingCount = 0;
×
1106
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
×
1107
    }
1108

1109
    @Override
1110
    public ChannelFuture writeHeaders(
1111
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1112
        int streamDependency, short weight, boolean exclusive,
1113
        int padding, boolean endStream, ChannelPromise promise) {
1114
      pingCount = 0;
1✔
1115
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
1✔
1116
          padding, endStream, promise);
1117
    }
1118

1119
    @Override
1120
    public ChannelFuture writeWindowUpdate(
1121
        ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
1122
      pingCount = 0;
1✔
1123
      return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
1✔
1124
    }
1125

1126
    @Override
1127
    public ChannelFuture writePing(
1128
        ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
1129
      if (!ack) {
1✔
1130
        pingCount++;
1✔
1131
      }
1132
      return super.writePing(ctx, ack, data, promise);
1✔
1133
    }
1134

1135
    @Override
1136
    public ChannelFuture writeData(
1137
        ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
1138
        ChannelPromise promise) {
1139
      if (data.isReadable()) {
1✔
1140
        pingCount = 0;
1✔
1141
      }
1142
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1143
    }
1144
  }
1145
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc