• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19528

28 Oct 2024 05:25PM UTC coverage: 84.59% (-0.04%) from 84.627%
#19528

push

github

web-flow
netty: add soft Metadata size limit enforcement. (#11603)

33914 of 40092 relevant lines covered (84.59%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.33
/../netty/src/main/java/io/grpc/netty/NettyClientHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
20
import static io.netty.util.CharsetUtil.UTF_8;
21
import static io.netty.util.internal.ObjectUtil.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Stopwatch;
26
import com.google.common.base.Supplier;
27
import com.google.common.base.Ticker;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.InternalChannelz;
31
import io.grpc.Metadata;
32
import io.grpc.Status;
33
import io.grpc.StatusException;
34
import io.grpc.internal.ClientStreamListener.RpcProgress;
35
import io.grpc.internal.ClientTransport.PingCallback;
36
import io.grpc.internal.GrpcAttributes;
37
import io.grpc.internal.GrpcUtil;
38
import io.grpc.internal.Http2Ping;
39
import io.grpc.internal.InUseStateAggregator;
40
import io.grpc.internal.KeepAliveManager;
41
import io.grpc.internal.TransportTracer;
42
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
43
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufUtil;
45
import io.netty.buffer.Unpooled;
46
import io.netty.channel.Channel;
47
import io.netty.channel.ChannelFuture;
48
import io.netty.channel.ChannelFutureListener;
49
import io.netty.channel.ChannelHandlerContext;
50
import io.netty.channel.ChannelPromise;
51
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
52
import io.netty.handler.codec.http2.DefaultHttp2Connection;
53
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
54
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
55
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
56
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
57
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
58
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
59
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
60
import io.netty.handler.codec.http2.Http2CodecUtil;
61
import io.netty.handler.codec.http2.Http2Connection;
62
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
63
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
64
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
65
import io.netty.handler.codec.http2.Http2Error;
66
import io.netty.handler.codec.http2.Http2Exception;
67
import io.netty.handler.codec.http2.Http2FrameAdapter;
68
import io.netty.handler.codec.http2.Http2FrameLogger;
69
import io.netty.handler.codec.http2.Http2FrameReader;
70
import io.netty.handler.codec.http2.Http2FrameWriter;
71
import io.netty.handler.codec.http2.Http2Headers;
72
import io.netty.handler.codec.http2.Http2HeadersDecoder;
73
import io.netty.handler.codec.http2.Http2HeadersEncoder;
74
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
75
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
76
import io.netty.handler.codec.http2.Http2Settings;
77
import io.netty.handler.codec.http2.Http2Stream;
78
import io.netty.handler.codec.http2.Http2StreamVisitor;
79
import io.netty.handler.codec.http2.StreamBufferingEncoder;
80
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
81
import io.netty.handler.logging.LogLevel;
82
import io.perfmark.PerfMark;
83
import io.perfmark.Tag;
84
import io.perfmark.TaskCloseable;
85
import java.nio.channels.ClosedChannelException;
86
import java.util.concurrent.Executor;
87
import java.util.logging.Level;
88
import java.util.logging.Logger;
89
import javax.annotation.Nullable;
90

91
/**
92
 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
93
 * the context of the Netty Channel thread.
94
 */
95
class NettyClientHandler extends AbstractNettyHandler {
96
  private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
1✔
97

98
  /**
99
   * A message that simply passes through the channel without any real processing. It is useful to
100
   * check if buffers have been drained and test the health of the channel in a single operation.
101
   */
102
  static final Object NOOP_MESSAGE = new Object();
1✔
103

104
  /**
105
   * Status used when the transport has exhausted the number of streams.
106
   */
107
  private static final Status EXHAUSTED_STREAMS_STATUS =
1✔
108
          Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");
1✔
109
  private static final long USER_PING_PAYLOAD = 1111;
110

111
  private final Http2Connection.PropertyKey streamKey;
112
  private final ClientTransportLifecycleManager lifecycleManager;
113
  private final KeepAliveManager keepAliveManager;
114
  // Returns new unstarted stopwatches
115
  private final Supplier<Stopwatch> stopwatchFactory;
116
  private final TransportTracer transportTracer;
117
  private final Attributes eagAttributes;
118
  private final String authority;
119
  private final InUseStateAggregator<Http2Stream> inUseState =
1✔
120
      new InUseStateAggregator<Http2Stream>() {
1✔
121
        @Override
122
        protected void handleInUse() {
123
          lifecycleManager.notifyInUse(true);
1✔
124
        }
1✔
125

126
        @Override
127
        protected void handleNotInUse() {
128
          lifecycleManager.notifyInUse(false);
1✔
129
        }
1✔
130
      };
131

132
  private WriteQueue clientWriteQueue;
133
  private Http2Ping ping;
134
  private Attributes attributes;
135
  private InternalChannelz.Security securityInfo;
136
  private Status abruptGoAwayStatus;
137
  private Status channelInactiveReason;
138

139
  static NettyClientHandler newHandler(
140
      ClientTransportLifecycleManager lifecycleManager,
141
      @Nullable KeepAliveManager keepAliveManager,
142
      boolean autoFlowControl,
143
      int flowControlWindow,
144
      int maxHeaderListSize,
145
      int softLimitHeaderListSize,
146
      Supplier<Stopwatch> stopwatchFactory,
147
      Runnable tooManyPingsRunnable,
148
      TransportTracer transportTracer,
149
      Attributes eagAttributes,
150
      String authority,
151
      ChannelLogger negotiationLogger,
152
      Ticker ticker) {
153
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
154
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
1✔
155
    Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
1✔
156
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
157
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
158
    Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(encoder);
1✔
159
    Http2Connection connection = new DefaultHttp2Connection(false);
1✔
160
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
161
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
162
    DefaultHttp2RemoteFlowController controller =
1✔
163
        new DefaultHttp2RemoteFlowController(connection, dist);
164
    connection.remote().flowController(controller);
1✔
165

166
    return newHandler(
1✔
167
        connection,
168
        frameReader,
169
        frameWriter,
170
        lifecycleManager,
171
        keepAliveManager,
172
        autoFlowControl,
173
        flowControlWindow,
174
        maxHeaderListSize,
175
        softLimitHeaderListSize,
176
        stopwatchFactory,
177
        tooManyPingsRunnable,
178
        transportTracer,
179
        eagAttributes,
180
        authority,
181
        negotiationLogger,
182
        ticker);
183
  }
184

185
  @VisibleForTesting
186
  static NettyClientHandler newHandler(
187
      final Http2Connection connection,
188
      Http2FrameReader frameReader,
189
      Http2FrameWriter frameWriter,
190
      ClientTransportLifecycleManager lifecycleManager,
191
      KeepAliveManager keepAliveManager,
192
      boolean autoFlowControl,
193
      int flowControlWindow,
194
      int maxHeaderListSize,
195
      int softLimitHeaderListSize,
196
      Supplier<Stopwatch> stopwatchFactory,
197
      Runnable tooManyPingsRunnable,
198
      TransportTracer transportTracer,
199
      Attributes eagAttributes,
200
      String authority,
201
      ChannelLogger negotiationLogger,
202
      Ticker ticker) {
203
    Preconditions.checkNotNull(connection, "connection");
1✔
204
    Preconditions.checkNotNull(frameReader, "frameReader");
1✔
205
    Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
1✔
206
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
1✔
207
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
208
    Preconditions.checkArgument(softLimitHeaderListSize > 0,
1✔
209
        "softLimitHeaderListSize must be positive");
210
    Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
211
    Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
212
    Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
213
    Preconditions.checkNotNull(authority, "authority");
1✔
214

215
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
1✔
216
    frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
1✔
217
    frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
1✔
218

219
    PingCountingFrameWriter pingCounter;
220
    frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
1✔
221

222
    StreamBufferingEncoder encoder =
1✔
223
        new StreamBufferingEncoder(
224
            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
225

226
    // Create the local flow controller configured to auto-refill the connection window.
227
    connection.local().flowController(
1✔
228
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
229

230
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
231
        frameReader);
232

233
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection));
1✔
234

235
    Http2Settings settings = new Http2Settings();
1✔
236
    settings.pushEnabled(false);
1✔
237
    settings.initialWindowSize(flowControlWindow);
1✔
238
    settings.maxConcurrentStreams(0);
1✔
239
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
240

241
    return new NettyClientHandler(
1✔
242
        decoder,
243
        encoder,
244
        settings,
245
        negotiationLogger,
246
        lifecycleManager,
247
        keepAliveManager,
248
        stopwatchFactory,
249
        tooManyPingsRunnable,
250
        transportTracer,
251
        eagAttributes,
252
        authority,
253
        autoFlowControl,
254
        pingCounter,
255
        ticker,
256
        maxHeaderListSize,
257
        softLimitHeaderListSize);
258
  }
259

260
  private NettyClientHandler(
261
      Http2ConnectionDecoder decoder,
262
      Http2ConnectionEncoder encoder,
263
      Http2Settings settings,
264
      ChannelLogger negotiationLogger,
265
      ClientTransportLifecycleManager lifecycleManager,
266
      KeepAliveManager keepAliveManager,
267
      Supplier<Stopwatch> stopwatchFactory,
268
      final Runnable tooManyPingsRunnable,
269
      TransportTracer transportTracer,
270
      Attributes eagAttributes,
271
      String authority,
272
      boolean autoFlowControl,
273
      PingLimiter pingLimiter,
274
      Ticker ticker,
275
      int maxHeaderListSize,
276
      int softLimitHeaderListSize) {
277
    super(
1✔
278
        /* channelUnused= */ null,
279
        decoder,
280
        encoder,
281
        settings,
282
        negotiationLogger,
283
        autoFlowControl,
284
        pingLimiter,
285
        ticker,
286
        maxHeaderListSize,
287
        softLimitHeaderListSize);
288
    this.lifecycleManager = lifecycleManager;
1✔
289
    this.keepAliveManager = keepAliveManager;
1✔
290
    this.stopwatchFactory = stopwatchFactory;
1✔
291
    this.transportTracer = Preconditions.checkNotNull(transportTracer);
1✔
292
    this.eagAttributes = eagAttributes;
1✔
293
    this.authority = authority;
1✔
294
    this.attributes = Attributes.newBuilder()
1✔
295
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
1✔
296

297
    // Set the frame listener on the decoder.
298
    decoder().frameListener(new FrameListener());
1✔
299

300
    Http2Connection connection = encoder.connection();
1✔
301
    streamKey = connection.newKey();
1✔
302

303
    connection.addListener(new Http2ConnectionAdapter() {
1✔
304
      @Override
305
      public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
306
        byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
1✔
307
        goingAway(errorCode, debugDataBytes);
1✔
308
        if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
1✔
309
          String data = new String(debugDataBytes, UTF_8);
1✔
310
          logger.log(
1✔
311
              Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data);
312
          if ("too_many_pings".equals(data)) {
1✔
313
            tooManyPingsRunnable.run();
1✔
314
          }
315
        }
316
      }
1✔
317

318
      @Override
319
      public void onStreamActive(Http2Stream stream) {
320
        if (connection().numActiveStreams() == 1
1✔
321
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
322
          NettyClientHandler.this.keepAliveManager.onTransportActive();
1✔
323
        }
324
      }
1✔
325

326
      @Override
327
      public void onStreamClosed(Http2Stream stream) {
328
        // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
329
        // the first place, we don't propagate that option here, and it's safe to reset the in-use
330
        // state for them, which will be a cheap no-op.
331
        inUseState.updateObjectInUse(stream, false);
1✔
332
        if (connection().numActiveStreams() == 0
1✔
333
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
334
          NettyClientHandler.this.keepAliveManager.onTransportIdle();
1✔
335
        }
336
      }
1✔
337
    });
338
  }
1✔
339

340
  /**
341
   * The protocol negotiation attributes, available once the protocol negotiation completes;
342
   * otherwise returns {@code Attributes.EMPTY}.
343
   */
344
  Attributes getAttributes() {
345
    return attributes;
1✔
346
  }
347

348
  /**
349
   * Handler for commands sent from the stream.
350
   */
351
  @Override
352
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
353
          throws Exception {
354
    if (msg instanceof CreateStreamCommand) {
1✔
355
      createStream((CreateStreamCommand) msg, promise);
1✔
356
    } else if (msg instanceof SendGrpcFrameCommand) {
1✔
357
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
358
    } else if (msg instanceof CancelClientStreamCommand) {
1✔
359
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
1✔
360
    } else if (msg instanceof SendPingCommand) {
1✔
361
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
1✔
362
    } else if (msg instanceof GracefulCloseCommand) {
1✔
363
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
1✔
364
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
365
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
366
    } else if (msg == NOOP_MESSAGE) {
1✔
367
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
1✔
368
    } else {
369
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
370
    }
371
  }
1✔
372

373
  void startWriteQueue(Channel channel) {
374
    clientWriteQueue = new WriteQueue(channel);
1✔
375
  }
1✔
376

377
  WriteQueue getWriteQueue() {
378
    return clientWriteQueue;
1✔
379
  }
380

381
  ClientTransportLifecycleManager getLifecycleManager() {
382
    return lifecycleManager;
1✔
383
  }
384

385
  /**
386
   * Returns the given processed bytes back to inbound flow control.
387
   */
388
  void returnProcessedBytes(Http2Stream stream, int bytes) {
389
    try {
390
      decoder().flowController().consumeBytes(stream, bytes);
1✔
391
    } catch (Http2Exception e) {
×
392
      throw new RuntimeException(e);
×
393
    }
1✔
394
  }
1✔
395

396
  private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
397
    // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
398
    if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
1✔
399
      NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
400
      PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
1✔
401
      // check metadata size vs soft limit
402
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
403
      boolean shouldFail =
1✔
404
          Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
405
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize);
406
      if (shouldFail && endStream) {
1✔
407
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
×
408
            .withDescription(
×
409
                String.format(
×
410
                    "Server Status + Trailers of size %d exceeded Metadata size soft limit: %d",
411
                    h2HeadersSize,
×
412
                    softLimitHeaderListSize)), true, new Metadata());
×
413
        return;
×
414
      } else if (shouldFail) {
1✔
415
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
1✔
416
            .withDescription(
1✔
417
                String.format(
1✔
418
                    "Server Headers of size %d exceeded Metadata size soft limit: %d",
419
                    h2HeadersSize,
1✔
420
                    softLimitHeaderListSize)), true, new Metadata());
1✔
421
        return;
1✔
422
      }
423
      stream.transportHeadersReceived(headers, endStream);
1✔
424
    }
425

426
    if (keepAliveManager != null) {
1✔
427
      keepAliveManager.onDataReceived();
1✔
428
    }
429
  }
1✔
430

431
  /**
432
   * Handler for an inbound HTTP/2 DATA frame.
433
   */
434
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
435
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
436
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
437
    PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
1✔
438
    stream.transportDataReceived(data, endOfStream);
1✔
439
    if (keepAliveManager != null) {
1✔
440
      keepAliveManager.onDataReceived();
1✔
441
    }
442
  }
1✔
443

444
  /**
445
   * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
446
   */
447
  private void onRstStreamRead(int streamId, long errorCode) {
448
    NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
1✔
449
    if (stream != null) {
1✔
450
      PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
1✔
451
      Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
1✔
452
      stream.transportReportStatus(
1✔
453
          status,
454
          errorCode == Http2Error.REFUSED_STREAM.code()
1✔
455
              ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
456
          false /*stop delivery*/,
457
          new Metadata());
458
      if (keepAliveManager != null) {
1✔
459
        keepAliveManager.onDataReceived();
1✔
460
      }
461
    }
462
  }
1✔
463

464
  @Override
465
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
466
    logger.fine("Network channel being closed by the application.");
1✔
467
    if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
1✔
468
      lifecycleManager.notifyShutdown(
1✔
469
          Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"));
1✔
470
    }
471
    super.close(ctx, promise);
1✔
472
  }
1✔
473

474
  /**
475
   * Handler for the Channel shutting down.
476
   */
477
  @Override
478
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
479
    try {
480
      logger.fine("Network channel is closed");
1✔
481
      Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
1✔
482
      lifecycleManager.notifyShutdown(status);
1✔
483
      final Status streamStatus;
484
      if (channelInactiveReason != null) {
1✔
485
        streamStatus = channelInactiveReason;
1✔
486
      } else {
487
        streamStatus = lifecycleManager.getShutdownStatus();
1✔
488
      }
489
      try {
490
        cancelPing(lifecycleManager.getShutdownThrowable());
1✔
491
        // Report status to the application layer for any open streams
492
        connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
493
          @Override
494
          public boolean visit(Http2Stream stream) throws Http2Exception {
495
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
496
            if (clientStream != null) {
1✔
497
              clientStream.transportReportStatus(streamStatus, false, new Metadata());
1✔
498
            }
499
            return true;
1✔
500
          }
501
        });
502
      } finally {
503
        lifecycleManager.notifyTerminated(status);
1✔
504
      }
505
    } finally {
506
      // Close any open streams
507
      super.channelInactive(ctx);
1✔
508
      if (keepAliveManager != null) {
1✔
509
        keepAliveManager.onTransportTermination();
1✔
510
      }
511
    }
512
  }
1✔
513

514
  @Override
515
  public void handleProtocolNegotiationCompleted(
516
      Attributes attributes, InternalChannelz.Security securityInfo) {
517
    this.attributes = this.attributes.toBuilder().setAll(attributes).build();
1✔
518
    this.securityInfo = securityInfo;
1✔
519
    super.handleProtocolNegotiationCompleted(attributes, securityInfo);
1✔
520
    writeBufferingAndRemove(ctx().channel());
1✔
521
  }
1✔
522

523
  static void writeBufferingAndRemove(Channel channel) {
524
    checkNotNull(channel, "channel");
1✔
525
    ChannelHandlerContext handlerCtx =
1✔
526
        channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
1✔
527
    if (handlerCtx == null) {
1✔
528
      return;
1✔
529
    }
530
    ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
1✔
531
  }
1✔
532

533
  @Override
534
  public Attributes getEagAttributes() {
535
    return eagAttributes;
1✔
536
  }
537

538
  @Override
539
  public String getAuthority() {
540
    return authority;
1✔
541
  }
542

543
  InternalChannelz.Security getSecurityInfo() {
544
    return securityInfo;
1✔
545
  }
546

547
  @Override
548
  protected void onConnectionError(ChannelHandlerContext ctx,  boolean outbound, Throwable cause,
549
      Http2Exception http2Ex) {
550
    logger.log(Level.FINE, "Caught a connection error", cause);
1✔
551
    lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
1✔
552
    // Parent class will shut down the Channel
553
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
554
  }
1✔
555

556
  @Override
557
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
558
      Http2Exception.StreamException http2Ex) {
559
    // Close the stream with a status that contains the cause.
560
    NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId()));
1✔
561
    if (stream != null) {
1✔
562
      stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata());
1✔
563
    } else {
564
      logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause);
1✔
565
    }
566

567
    // Delegate to the base class to send a RST_STREAM.
568
    super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
569
  }
1✔
570

571
  @Override
572
  protected boolean isGracefulShutdownComplete() {
573
    // Only allow graceful shutdown to complete after all pending streams have completed.
574
    return super.isGracefulShutdownComplete()
1✔
575
        && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
1✔
576
  }
577

578
  /**
579
   * Attempts to create a new stream from the given command. If there are too many active streams,
580
   * the creation request is queued.
581
   */
582
  private void createStream(CreateStreamCommand command, ChannelPromise promise)
583
          throws Exception {
584
    if (lifecycleManager.getShutdownThrowable() != null) {
1✔
585
      command.stream().setNonExistent();
1✔
586
      // The connection is going away (it is really the GOAWAY case),
587
      // just terminate the stream now.
588
      command.stream().transportReportStatus(
1✔
589
          lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
1✔
590
      promise.setFailure(lifecycleManager.getShutdownThrowable());
1✔
591
      return;
1✔
592
    }
593

594
    // Get the stream ID for the new stream.
595
    int streamId;
596
    try {
597
      streamId = incrementAndGetNextStreamId();
1✔
598
    } catch (StatusException e) {
1✔
599
      command.stream().setNonExistent();
1✔
600
      // Stream IDs have been exhausted for this connection. Fail the promise immediately.
601
      promise.setFailure(e);
1✔
602

603
      // Initiate a graceful shutdown if we haven't already.
604
      if (!connection().goAwaySent()) {
1✔
605
        logger.fine("Stream IDs have been exhausted for this connection. "
1✔
606
                + "Initiating graceful shutdown of the connection.");
607
        lifecycleManager.notifyShutdown(e.getStatus());
1✔
608
        close(ctx(), ctx().newPromise());
1✔
609
      }
610
      return;
1✔
611
    }
1✔
612
    if (connection().goAwayReceived()) {
1✔
613
      Status s = abruptGoAwayStatus;
1✔
614
      int maxActiveStreams = connection().local().maxActiveStreams();
1✔
615
      int lastStreamId = connection().local().lastStreamKnownByPeer();
1✔
616
      if (s == null) {
1✔
617
        // Should be impossible, but handle pseudo-gracefully
618
        s = Status.INTERNAL.withDescription(
×
619
            "Failed due to abrupt GOAWAY, but can't find GOAWAY details");
620
      } else if (streamId > lastStreamId) {
1✔
621
        s = s.augmentDescription(
1✔
622
            "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId);
623
      } else if (connection().local().numActiveStreams() == maxActiveStreams) {
1✔
624
        s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams);
1✔
625
      }
626
      if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) {
1✔
627
        // This should only be reachable during onGoAwayReceived, as otherwise
628
        // getShutdownThrowable() != null
629
        command.stream().setNonExistent();
1✔
630
        command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
1✔
631
        promise.setFailure(s.asRuntimeException());
1✔
632
        return;
1✔
633
      }
634
    }
635

636
    NettyClientStream.TransportState stream = command.stream();
1✔
637
    Http2Headers headers = command.headers();
1✔
638
    stream.setId(streamId);
1✔
639

640
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
1✔
641
      PerfMark.linkIn(command.getLink());
1✔
642
      PerfMark.attachTag(stream.tag());
1✔
643
      createStreamTraced(
1✔
644
          streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
1✔
645
    }
646
  }
1✔
647

648
  private void createStreamTraced(
649
      final int streamId,
650
      final NettyClientStream.TransportState stream,
651
      final Http2Headers headers,
652
      boolean isGet,
653
      final boolean shouldBeCountedForInUse,
654
      final ChannelPromise promise) {
655
    // Create an intermediate promise so that we can intercept the failure reported back to the
656
    // application.
657
    ChannelPromise tempPromise = ctx().newPromise();
1✔
658
    encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
1✔
659
        .addListener(new ChannelFutureListener() {
1✔
660
          @Override
661
          public void operationComplete(ChannelFuture future) throws Exception {
662
            if (future.isSuccess()) {
1✔
663
              // The http2Stream will be null in case a stream buffered in the encoder
664
              // was canceled via RST_STREAM.
665
              Http2Stream http2Stream = connection().stream(streamId);
1✔
666
              if (http2Stream != null) {
1✔
667
                stream.getStatsTraceContext().clientOutboundHeaders();
1✔
668
                http2Stream.setProperty(streamKey, stream);
1✔
669

670
                // This delays the in-use state until the I/O completes, which technically may
671
                // be later than we would like.
672
                if (shouldBeCountedForInUse) {
1✔
673
                  inUseState.updateObjectInUse(http2Stream, true);
1✔
674
                }
675

676
                // Attach the client stream to the HTTP/2 stream object as user data.
677
                stream.setHttp2Stream(http2Stream);
1✔
678
              }
679
              // Otherwise, the stream has been cancelled and Netty is sending a
680
              // RST_STREAM frame which causes it to purge pending writes from the
681
              // flow-controller and delete the http2Stream. The stream listener has already
682
              // been notified of cancellation so there is nothing to do.
683

684
              // Just forward on the success status to the original promise.
685
              promise.setSuccess();
1✔
686
            } else {
1✔
687
              Throwable cause = future.cause();
1✔
688
              if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
1✔
689
                StreamBufferingEncoder.Http2GoAwayException e =
1✔
690
                    (StreamBufferingEncoder.Http2GoAwayException) cause;
691
                Status status = statusFromH2Error(
1✔
692
                    Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
693
                    e.errorCode(), e.debugData());
1✔
694
                cause = status.asRuntimeException();
1✔
695
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
696
              } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
1✔
697
                Status status = lifecycleManager.getShutdownStatus();
1✔
698
                if (status == null) {
1✔
699
                  status = Status.UNAVAILABLE.withCause(cause)
×
700
                      .withDescription("Connection closed while stream is buffered");
×
701
                }
702
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
703
              }
704
              promise.setFailure(cause);
1✔
705
            }
706
          }
1✔
707
        });
708
  }
1✔
709

710
  /**
711
   * Cancels this stream.
712
   */
713
  private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
714
      ChannelPromise promise) {
715
    NettyClientStream.TransportState stream = cmd.stream();
1✔
716
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
1✔
717
      PerfMark.attachTag(stream.tag());
1✔
718
      PerfMark.linkIn(cmd.getLink());
1✔
719
      Status reason = cmd.reason();
1✔
720
      if (reason != null) {
1✔
721
        stream.transportReportStatus(reason, true, new Metadata());
1✔
722
      }
723
      if (!cmd.stream().isNonExistent()) {
1✔
724
        encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
1✔
725
      } else {
726
        promise.setSuccess();
1✔
727
      }
728
    }
729
  }
1✔
730

731
  /**
732
   * Sends the given GRPC frame for the stream.
733
   */
734
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
735
      ChannelPromise promise) {
736
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
1✔
737
      PerfMark.attachTag(cmd.stream().tag());
1✔
738
      PerfMark.linkIn(cmd.getLink());
1✔
739
      // Call the base class to write the HTTP/2 DATA frame.
740
      // Note: no need to flush since this is handled by the outbound flow controller.
741
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
742
    }
743
  }
1✔
744

745
  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
746
      ChannelPromise promise) {
747
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
1✔
748
      PerfMark.linkIn(msg.getLink());
1✔
749
      sendPingFrameTraced(ctx, msg, promise);
1✔
750
    }
751
  }
1✔
752

753
  /**
754
   * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
755
   * registered to be called when the existing operation completes, and no new frame is sent.
756
   */
757
  private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
758
      ChannelPromise promise) {
759
    // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
760
    // but before termination. After termination, messages will no longer arrive because the
761
    // pipeline clears all handlers on channel close.
762

763
    PingCallback callback = msg.callback();
1✔
764
    Executor executor = msg.executor();
1✔
765
    // we only allow one outstanding ping at a time, so just add the callback to
766
    // any outstanding operation
767
    if (ping != null) {
1✔
768
      promise.setSuccess();
1✔
769
      ping.addCallback(callback, executor);
1✔
770
      return;
1✔
771
    }
772

773
    // Use a new promise to prevent calling the callback twice on write failure: here and in
774
    // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if
775
    // ping != null above.
776
    promise.setSuccess();
1✔
777
    promise = ctx().newPromise();
1✔
778
    // set outstanding operation
779
    long data = USER_PING_PAYLOAD;
1✔
780
    Stopwatch stopwatch = stopwatchFactory.get();
1✔
781
    stopwatch.start();
1✔
782
    ping = new Http2Ping(data, stopwatch);
1✔
783
    ping.addCallback(callback, executor);
1✔
784
    // and then write the ping
785
    encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise);
1✔
786
    ctx.flush();
1✔
787
    final Http2Ping finalPing = ping;
1✔
788
    promise.addListener(new ChannelFutureListener() {
1✔
789
      @Override
790
      public void operationComplete(ChannelFuture future) throws Exception {
791
        if (future.isSuccess()) {
1✔
792
          transportTracer.reportKeepAliveSent();
1✔
793
        } else {
794
          Throwable cause = future.cause();
×
795
          if (cause instanceof ClosedChannelException) {
×
796
            cause = lifecycleManager.getShutdownThrowable();
×
797
            if (cause == null) {
×
798
              cause = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
×
799
                  .withCause(future.cause()).asException();
×
800
            }
801
          }
802
          finalPing.failed(cause);
×
803
          if (ping == finalPing) {
×
804
            ping = null;
×
805
          }
806
        }
807
      }
1✔
808
    });
809
  }
1✔
810

811
  private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
812
      ChannelPromise promise) throws Exception {
813
    lifecycleManager.notifyShutdown(msg.getStatus());
1✔
814
    // Explicitly flush to create any buffered streams before sending GOAWAY.
815
    // TODO(ejona): determine if the need to flush is a bug in Netty
816
    flush(ctx);
1✔
817
    close(ctx, promise);
1✔
818
  }
1✔
819

820
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
821
      ChannelPromise promise) throws Exception {
822
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
823
      @Override
824
      public boolean visit(Http2Stream stream) throws Http2Exception {
825
        NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
826
        Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
1✔
827
        try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
1✔
828
          PerfMark.linkIn(msg.getLink());
1✔
829
          PerfMark.attachTag(tag);
1✔
830
          if (clientStream != null) {
1✔
831
            clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
1✔
832
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
833
          }
834
          stream.close();
1✔
835
          return true;
1✔
836
        }
837
      }
838
    });
839
    close(ctx, promise);
1✔
840
  }
1✔
841

842
  /**
843
   * Handler for a GOAWAY being received. Fails any streams created after the
844
   * last known stream. May only be called during a read.
845
   */
846
  private void goingAway(long errorCode, byte[] debugData) {
847
    Status finalStatus = statusFromH2Error(
1✔
848
        Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
849
    lifecycleManager.notifyGracefulShutdown(finalStatus);
1✔
850
    abruptGoAwayStatus = statusFromH2Error(
1✔
851
        Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
852
    // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
853
    // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
854
    // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
855
    // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
856
    // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
857
    // UNAVAILABLE. https://github.com/netty/netty/issues/10670
858
    final Status abruptGoAwayStatusConservative = statusFromH2Error(
1✔
859
        null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
860
    final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code();
1✔
861
    // Try to allocate as many in-flight streams as possible, to reduce race window of
862
    // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
863
    // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
864
    // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
865
    // processed and thus this processing must be in-line before processing additional reads.
866

867
    // This can cause reentrancy, but should be minor since it is normal to handle writes in
868
    // response to a read. Also, the call stack is rather shallow at this point
869
    clientWriteQueue.drainNow();
1✔
870
    if (lifecycleManager.notifyShutdown(finalStatus)) {
1✔
871
      // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
872
      // RPCs were not observed by the remote and so should be UNAVAILABLE.
873
      channelInactiveReason = statusFromH2Error(
1✔
874
          null, "Connection closed after GOAWAY", errorCode, debugData);
875
    }
876

877
    final int lastKnownStream = connection().local().lastStreamKnownByPeer();
1✔
878
    try {
879
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
880
        @Override
881
        public boolean visit(Http2Stream stream) throws Http2Exception {
882
          if (stream.id() > lastKnownStream) {
1✔
883
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
884
            if (clientStream != null) {
1✔
885
              // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
886
              // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
887
              // retries, but only if something else caused a connection failure.
888
              RpcProgress progress = mayBeHittingNettyBug
1✔
889
                  ? RpcProgress.PROCESSED
1✔
890
                  : RpcProgress.REFUSED;
1✔
891
              clientStream.transportReportStatus(
1✔
892
                  abruptGoAwayStatusConservative, progress, false, new Metadata());
893
            }
894
            stream.close();
1✔
895
          }
896
          return true;
1✔
897
        }
898
      });
899
    } catch (Http2Exception e) {
×
900
      throw new RuntimeException(e);
×
901
    }
1✔
902
  }
1✔
903

904
  private void cancelPing(Throwable t) {
905
    if (ping != null) {
1✔
906
      ping.failed(t);
1✔
907
      ping = null;
1✔
908
    }
909
  }
1✔
910

911
  /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
912
  private Status statusFromH2Error(
913
      Status.Code statusCode, String context, long errorCode, byte[] debugData) {
914
    Status status = GrpcUtil.Http2Error.statusForCode(errorCode);
1✔
915
    if (statusCode == null) {
1✔
916
      statusCode = status.getCode();
1✔
917
    }
918
    String debugString = "";
1✔
919
    if (debugData != null && debugData.length > 0) {
1✔
920
      // If a debug message was provided, use it.
921
      debugString = ", debug data: " + new String(debugData, UTF_8);
1✔
922
    }
923
    return statusCode.toStatus()
1✔
924
        .withDescription(context + ". " + status.getDescription() + debugString);
1✔
925
  }
926

927
  /**
928
   * Gets the client stream associated to the given HTTP/2 stream object.
929
   */
930
  private NettyClientStream.TransportState clientStream(Http2Stream stream) {
931
    return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey);
1✔
932
  }
933

934
  private int incrementAndGetNextStreamId() throws StatusException {
935
    int nextStreamId = connection().local().incrementAndGetNextStreamId();
1✔
936
    if (nextStreamId < 0) {
1✔
937
      logger.fine("Stream IDs have been exhausted for this connection. "
1✔
938
              + "Initiating graceful shutdown of the connection.");
939
      throw EXHAUSTED_STREAMS_STATUS.asException();
1✔
940
    }
941
    return nextStreamId;
1✔
942
  }
943

944
  private Http2Stream requireHttp2Stream(int streamId) {
945
    Http2Stream stream = connection().stream(streamId);
1✔
946
    if (stream == null) {
1✔
947
      // This should never happen.
948
      throw new AssertionError("Stream does not exist: " + streamId);
×
949
    }
950
    return stream;
1✔
951
  }
952

953
  private class FrameListener extends Http2FrameAdapter {
1✔
954
    private boolean firstSettings = true;
1✔
955

956
    @Override
957
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
958
      if (firstSettings) {
1✔
959
        firstSettings = false;
1✔
960
        attributes = lifecycleManager.filterAttributes(attributes);
1✔
961
        lifecycleManager.notifyReady();
1✔
962
      }
963
    }
1✔
964

965
    @Override
966
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
967
        boolean endOfStream) throws Http2Exception {
968
      NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
969
      return padding;
1✔
970
    }
971

972
    @Override
973
    public void onHeadersRead(ChannelHandlerContext ctx,
974
        int streamId,
975
        Http2Headers headers,
976
        int streamDependency,
977
        short weight,
978
        boolean exclusive,
979
        int padding,
980
        boolean endStream) throws Http2Exception {
981
      NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
1✔
982
    }
1✔
983

984
    @Override
985
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
986
        throws Http2Exception {
987
      NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
1✔
988
    }
1✔
989

990
    @Override
991
    public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception {
992
      Http2Ping p = ping;
1✔
993
      if (ackPayload == flowControlPing().payload()) {
1✔
994
        flowControlPing().updateWindow();
1✔
995
        logger.log(Level.FINE, "Window: {0}",
1✔
996
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
997
      } else if (p != null) {
1✔
998
        if (p.payload() == ackPayload) {
1✔
999
          p.complete();
1✔
1000
          ping = null;
1✔
1001
        } else {
1002
          logger.log(Level.WARNING,
1✔
1003
              "Received unexpected ping ack. Expecting {0}, got {1}",
1004
              new Object[] {p.payload(), ackPayload});
1✔
1005
        }
1006
      } else {
1007
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1008
      }
1009
      if (keepAliveManager != null) {
1✔
1010
        keepAliveManager.onDataReceived();
1✔
1011
      }
1012
    }
1✔
1013

1014
    @Override
1015
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
1016
      if (keepAliveManager != null) {
1✔
1017
        keepAliveManager.onDataReceived();
×
1018
      }
1019
    }
1✔
1020
  }
1021

1022
  private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
1023
      implements AbstractNettyHandler.PingLimiter {
1024
    private int pingCount;
1025

1026
    public PingCountingFrameWriter(Http2FrameWriter delegate) {
1027
      super(delegate);
1✔
1028
    }
1✔
1029

1030
    @Override
1031
    public boolean isPingAllowed() {
1032
      // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
1033
      return pingCount < 2;
1✔
1034
    }
1035

1036
    @Override
1037
    public ChannelFuture writeHeaders(
1038
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1039
        int padding, boolean endStream, ChannelPromise promise) {
1040
      pingCount = 0;
×
1041
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
×
1042
    }
1043

1044
    @Override
1045
    public ChannelFuture writeHeaders(
1046
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1047
        int streamDependency, short weight, boolean exclusive,
1048
        int padding, boolean endStream, ChannelPromise promise) {
1049
      pingCount = 0;
1✔
1050
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
1✔
1051
          padding, endStream, promise);
1052
    }
1053

1054
    @Override
1055
    public ChannelFuture writeWindowUpdate(
1056
        ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
1057
      pingCount = 0;
1✔
1058
      return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
1✔
1059
    }
1060

1061
    @Override
1062
    public ChannelFuture writePing(
1063
        ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
1064
      if (!ack) {
1✔
1065
        pingCount++;
1✔
1066
      }
1067
      return super.writePing(ctx, ack, data, promise);
1✔
1068
    }
1069

1070
    @Override
1071
    public ChannelFuture writeData(
1072
        ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
1073
        ChannelPromise promise) {
1074
      if (data.isReadable()) {
1✔
1075
        pingCount = 0;
1✔
1076
      }
1077
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1078
    }
1079
  }
1080
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc