• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18898

14 Nov 2023 07:09PM UTC coverage: 88.219% (+0.001%) from 88.218%
#18898

push

github

web-flow
netty: disable huffman coding in headers (#10563)

 Huffman in the datacenter doesn't add much value in the common cases.  It could be useful to turn on huffman based on the connection latency (say, >10ms means "assume cross-datacenter") but the Netty API doesn't lend it to that. The savings here aren't huge and it is expensive; the table provides the biggest savings.

30341 of 34393 relevant lines covered (88.22%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.51
/../netty/src/main/java/io/grpc/netty/NettyClientHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
20
import static io.netty.util.CharsetUtil.UTF_8;
21
import static io.netty.util.internal.ObjectUtil.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Stopwatch;
26
import com.google.common.base.Supplier;
27
import com.google.common.base.Ticker;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.InternalChannelz;
31
import io.grpc.Metadata;
32
import io.grpc.Status;
33
import io.grpc.StatusException;
34
import io.grpc.internal.ClientStreamListener.RpcProgress;
35
import io.grpc.internal.ClientTransport.PingCallback;
36
import io.grpc.internal.GrpcAttributes;
37
import io.grpc.internal.GrpcUtil;
38
import io.grpc.internal.Http2Ping;
39
import io.grpc.internal.InUseStateAggregator;
40
import io.grpc.internal.KeepAliveManager;
41
import io.grpc.internal.TransportTracer;
42
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
43
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufUtil;
45
import io.netty.buffer.Unpooled;
46
import io.netty.channel.Channel;
47
import io.netty.channel.ChannelFuture;
48
import io.netty.channel.ChannelFutureListener;
49
import io.netty.channel.ChannelHandlerContext;
50
import io.netty.channel.ChannelPromise;
51
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
52
import io.netty.handler.codec.http2.DefaultHttp2Connection;
53
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
54
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
55
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
56
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
57
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
58
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
59
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
60
import io.netty.handler.codec.http2.Http2CodecUtil;
61
import io.netty.handler.codec.http2.Http2Connection;
62
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
63
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
64
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
65
import io.netty.handler.codec.http2.Http2Error;
66
import io.netty.handler.codec.http2.Http2Exception;
67
import io.netty.handler.codec.http2.Http2FrameAdapter;
68
import io.netty.handler.codec.http2.Http2FrameLogger;
69
import io.netty.handler.codec.http2.Http2FrameReader;
70
import io.netty.handler.codec.http2.Http2FrameWriter;
71
import io.netty.handler.codec.http2.Http2Headers;
72
import io.netty.handler.codec.http2.Http2HeadersDecoder;
73
import io.netty.handler.codec.http2.Http2HeadersEncoder;
74
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
75
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
76
import io.netty.handler.codec.http2.Http2Settings;
77
import io.netty.handler.codec.http2.Http2Stream;
78
import io.netty.handler.codec.http2.Http2StreamVisitor;
79
import io.netty.handler.codec.http2.StreamBufferingEncoder;
80
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
81
import io.netty.handler.logging.LogLevel;
82
import io.perfmark.PerfMark;
83
import io.perfmark.Tag;
84
import io.perfmark.TaskCloseable;
85
import java.nio.channels.ClosedChannelException;
86
import java.util.concurrent.Executor;
87
import java.util.logging.Level;
88
import java.util.logging.Logger;
89
import javax.annotation.Nullable;
90

91
/**
92
 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
93
 * the context of the Netty Channel thread.
94
 */
95
class NettyClientHandler extends AbstractNettyHandler {
96
  private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
1✔
97

98
  /**
99
   * A message that simply passes through the channel without any real processing. It is useful to
100
   * check if buffers have been drained and test the health of the channel in a single operation.
101
   */
102
  static final Object NOOP_MESSAGE = new Object();
1✔
103

104
  /**
105
   * Status used when the transport has exhausted the number of streams.
106
   */
107
  private static final Status EXHAUSTED_STREAMS_STATUS =
1✔
108
          Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");
1✔
109
  private static final long USER_PING_PAYLOAD = 1111;
110

111
  private final Http2Connection.PropertyKey streamKey;
112
  private final ClientTransportLifecycleManager lifecycleManager;
113
  private final KeepAliveManager keepAliveManager;
114
  // Returns new unstarted stopwatches
115
  private final Supplier<Stopwatch> stopwatchFactory;
116
  private final TransportTracer transportTracer;
117
  private final Attributes eagAttributes;
118
  private final String authority;
119
  private final InUseStateAggregator<Http2Stream> inUseState =
1✔
120
      new InUseStateAggregator<Http2Stream>() {
1✔
121
        @Override
122
        protected void handleInUse() {
123
          lifecycleManager.notifyInUse(true);
1✔
124
        }
1✔
125

126
        @Override
127
        protected void handleNotInUse() {
128
          lifecycleManager.notifyInUse(false);
1✔
129
        }
1✔
130
      };
131

132
  private WriteQueue clientWriteQueue;
133
  private Http2Ping ping;
134
  private Attributes attributes;
135
  private InternalChannelz.Security securityInfo;
136
  private Status abruptGoAwayStatus;
137
  private Status channelInactiveReason;
138

139
  static NettyClientHandler newHandler(
140
      ClientTransportLifecycleManager lifecycleManager,
141
      @Nullable KeepAliveManager keepAliveManager,
142
      boolean autoFlowControl,
143
      int flowControlWindow,
144
      int maxHeaderListSize,
145
      Supplier<Stopwatch> stopwatchFactory,
146
      Runnable tooManyPingsRunnable,
147
      TransportTracer transportTracer,
148
      Attributes eagAttributes,
149
      String authority,
150
      ChannelLogger negotiationLogger,
151
      Ticker ticker) {
152
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
153
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
1✔
154
    Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
1✔
155
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
156
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
157
    Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(encoder);
1✔
158
    Http2Connection connection = new DefaultHttp2Connection(false);
1✔
159
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
160
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
161
    DefaultHttp2RemoteFlowController controller =
1✔
162
        new DefaultHttp2RemoteFlowController(connection, dist);
163
    connection.remote().flowController(controller);
1✔
164

165
    return newHandler(
1✔
166
        connection,
167
        frameReader,
168
        frameWriter,
169
        lifecycleManager,
170
        keepAliveManager,
171
        autoFlowControl,
172
        flowControlWindow,
173
        maxHeaderListSize,
174
        stopwatchFactory,
175
        tooManyPingsRunnable,
176
        transportTracer,
177
        eagAttributes,
178
        authority,
179
        negotiationLogger,
180
        ticker);
181
  }
182

183
  @VisibleForTesting
184
  static NettyClientHandler newHandler(
185
      final Http2Connection connection,
186
      Http2FrameReader frameReader,
187
      Http2FrameWriter frameWriter,
188
      ClientTransportLifecycleManager lifecycleManager,
189
      KeepAliveManager keepAliveManager,
190
      boolean autoFlowControl,
191
      int flowControlWindow,
192
      int maxHeaderListSize,
193
      Supplier<Stopwatch> stopwatchFactory,
194
      Runnable tooManyPingsRunnable,
195
      TransportTracer transportTracer,
196
      Attributes eagAttributes,
197
      String authority,
198
      ChannelLogger negotiationLogger,
199
      Ticker ticker) {
200
    Preconditions.checkNotNull(connection, "connection");
1✔
201
    Preconditions.checkNotNull(frameReader, "frameReader");
1✔
202
    Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
1✔
203
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
1✔
204
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
205
    Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
206
    Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
207
    Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
208
    Preconditions.checkNotNull(authority, "authority");
1✔
209

210
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
1✔
211
    frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
1✔
212
    frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
1✔
213

214
    PingCountingFrameWriter pingCounter;
215
    frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
1✔
216

217
    StreamBufferingEncoder encoder =
1✔
218
        new StreamBufferingEncoder(
219
            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
220

221
    // Create the local flow controller configured to auto-refill the connection window.
222
    connection.local().flowController(
1✔
223
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
224

225
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
226
        frameReader);
227

228
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection));
1✔
229

230
    Http2Settings settings = new Http2Settings();
1✔
231
    settings.pushEnabled(false);
1✔
232
    settings.initialWindowSize(flowControlWindow);
1✔
233
    settings.maxConcurrentStreams(0);
1✔
234
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
235

236
    return new NettyClientHandler(
1✔
237
        decoder,
238
        encoder,
239
        settings,
240
        negotiationLogger,
241
        lifecycleManager,
242
        keepAliveManager,
243
        stopwatchFactory,
244
        tooManyPingsRunnable,
245
        transportTracer,
246
        eagAttributes,
247
        authority,
248
        autoFlowControl,
249
        pingCounter,
250
        ticker);
251
  }
252

253
  private NettyClientHandler(
254
      Http2ConnectionDecoder decoder,
255
      Http2ConnectionEncoder encoder,
256
      Http2Settings settings,
257
      ChannelLogger negotiationLogger,
258
      ClientTransportLifecycleManager lifecycleManager,
259
      KeepAliveManager keepAliveManager,
260
      Supplier<Stopwatch> stopwatchFactory,
261
      final Runnable tooManyPingsRunnable,
262
      TransportTracer transportTracer,
263
      Attributes eagAttributes,
264
      String authority,
265
      boolean autoFlowControl,
266
      PingLimiter pingLimiter,
267
      Ticker ticker) {
268
    super(/* channelUnused= */ null, decoder, encoder, settings,
1✔
269
        negotiationLogger, autoFlowControl, pingLimiter, ticker);
270
    this.lifecycleManager = lifecycleManager;
1✔
271
    this.keepAliveManager = keepAliveManager;
1✔
272
    this.stopwatchFactory = stopwatchFactory;
1✔
273
    this.transportTracer = Preconditions.checkNotNull(transportTracer);
1✔
274
    this.eagAttributes = eagAttributes;
1✔
275
    this.authority = authority;
1✔
276
    this.attributes = Attributes.newBuilder()
1✔
277
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
1✔
278

279
    // Set the frame listener on the decoder.
280
    decoder().frameListener(new FrameListener());
1✔
281

282
    Http2Connection connection = encoder.connection();
1✔
283
    streamKey = connection.newKey();
1✔
284

285
    connection.addListener(new Http2ConnectionAdapter() {
1✔
286
      @Override
287
      public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
288
        byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
1✔
289
        goingAway(errorCode, debugDataBytes);
1✔
290
        if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
1✔
291
          String data = new String(debugDataBytes, UTF_8);
1✔
292
          logger.log(
1✔
293
              Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data);
294
          if ("too_many_pings".equals(data)) {
1✔
295
            tooManyPingsRunnable.run();
1✔
296
          }
297
        }
298
      }
1✔
299

300
      @Override
301
      public void onStreamActive(Http2Stream stream) {
302
        if (connection().numActiveStreams() == 1
1✔
303
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
304
          NettyClientHandler.this.keepAliveManager.onTransportActive();
1✔
305
        }
306
      }
1✔
307

308
      @Override
309
      public void onStreamClosed(Http2Stream stream) {
310
        // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
311
        // the first place, we don't propagate that option here, and it's safe to reset the in-use
312
        // state for them, which will be a cheap no-op.
313
        inUseState.updateObjectInUse(stream, false);
1✔
314
        if (connection().numActiveStreams() == 0
1✔
315
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
316
          NettyClientHandler.this.keepAliveManager.onTransportIdle();
1✔
317
        }
318
      }
1✔
319
    });
320
  }
1✔
321

322
  /**
323
   * The protocol negotiation attributes, available once the protocol negotiation completes;
324
   * otherwise returns {@code Attributes.EMPTY}.
325
   */
326
  Attributes getAttributes() {
327
    return attributes;
1✔
328
  }
329

330
  /**
331
   * Handler for commands sent from the stream.
332
   */
333
  @Override
334
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
335
          throws Exception {
336
    if (msg instanceof CreateStreamCommand) {
1✔
337
      createStream((CreateStreamCommand) msg, promise);
1✔
338
    } else if (msg instanceof SendGrpcFrameCommand) {
1✔
339
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
340
    } else if (msg instanceof CancelClientStreamCommand) {
1✔
341
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
1✔
342
    } else if (msg instanceof SendPingCommand) {
1✔
343
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
1✔
344
    } else if (msg instanceof GracefulCloseCommand) {
1✔
345
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
1✔
346
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
347
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
348
    } else if (msg == NOOP_MESSAGE) {
1✔
349
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
1✔
350
    } else {
351
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
352
    }
353
  }
1✔
354

355
  void startWriteQueue(Channel channel) {
356
    clientWriteQueue = new WriteQueue(channel);
1✔
357
  }
1✔
358

359
  WriteQueue getWriteQueue() {
360
    return clientWriteQueue;
1✔
361
  }
362

363
  ClientTransportLifecycleManager getLifecycleManager() {
364
    return lifecycleManager;
1✔
365
  }
366

367
  /**
368
   * Returns the given processed bytes back to inbound flow control.
369
   */
370
  void returnProcessedBytes(Http2Stream stream, int bytes) {
371
    try {
372
      decoder().flowController().consumeBytes(stream, bytes);
1✔
373
    } catch (Http2Exception e) {
×
374
      throw new RuntimeException(e);
×
375
    }
1✔
376
  }
1✔
377

378
  private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
379
    // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
380
    if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
1✔
381
      NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
382
      PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
1✔
383
      stream.transportHeadersReceived(headers, endStream);
1✔
384
    }
385

386
    if (keepAliveManager != null) {
1✔
387
      keepAliveManager.onDataReceived();
1✔
388
    }
389
  }
1✔
390

391
  /**
392
   * Handler for an inbound HTTP/2 DATA frame.
393
   */
394
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
395
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
396
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
397
    PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
1✔
398
    stream.transportDataReceived(data, endOfStream);
1✔
399
    if (keepAliveManager != null) {
1✔
400
      keepAliveManager.onDataReceived();
1✔
401
    }
402
  }
1✔
403

404
  /**
405
   * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
406
   */
407
  private void onRstStreamRead(int streamId, long errorCode) {
408
    NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
1✔
409
    if (stream != null) {
1✔
410
      PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
1✔
411
      Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
1✔
412
      stream.transportReportStatus(
1✔
413
          status,
414
          errorCode == Http2Error.REFUSED_STREAM.code()
1✔
415
              ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
416
          false /*stop delivery*/,
417
          new Metadata());
418
      if (keepAliveManager != null) {
1✔
419
        keepAliveManager.onDataReceived();
1✔
420
      }
421
    }
422
  }
1✔
423

424
  @Override
425
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
426
    logger.fine("Network channel being closed by the application.");
1✔
427
    if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
1✔
428
      lifecycleManager.notifyShutdown(
1✔
429
          Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"));
1✔
430
    }
431
    super.close(ctx, promise);
1✔
432
  }
1✔
433

434
  /**
435
   * Handler for the Channel shutting down.
436
   */
437
  @Override
438
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
439
    try {
440
      logger.fine("Network channel is closed");
1✔
441
      Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
1✔
442
      lifecycleManager.notifyShutdown(status);
1✔
443
      final Status streamStatus;
444
      if (channelInactiveReason != null) {
1✔
445
        streamStatus = channelInactiveReason;
1✔
446
      } else {
447
        streamStatus = lifecycleManager.getShutdownStatus();
1✔
448
      }
449
      try {
450
        cancelPing(lifecycleManager.getShutdownThrowable());
1✔
451
        // Report status to the application layer for any open streams
452
        connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
453
          @Override
454
          public boolean visit(Http2Stream stream) throws Http2Exception {
455
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
456
            if (clientStream != null) {
1✔
457
              clientStream.transportReportStatus(streamStatus, false, new Metadata());
1✔
458
            }
459
            return true;
1✔
460
          }
461
        });
462
      } finally {
463
        lifecycleManager.notifyTerminated(status);
1✔
464
      }
465
    } finally {
466
      // Close any open streams
467
      super.channelInactive(ctx);
1✔
468
      if (keepAliveManager != null) {
1✔
469
        keepAliveManager.onTransportTermination();
1✔
470
      }
471
    }
472
  }
1✔
473

474
  @Override
475
  public void handleProtocolNegotiationCompleted(
476
      Attributes attributes, InternalChannelz.Security securityInfo) {
477
    this.attributes = this.attributes.toBuilder().setAll(attributes).build();
1✔
478
    this.securityInfo = securityInfo;
1✔
479
    super.handleProtocolNegotiationCompleted(attributes, securityInfo);
1✔
480
    writeBufferingAndRemove(ctx().channel());
1✔
481
  }
1✔
482

483
  static void writeBufferingAndRemove(Channel channel) {
484
    checkNotNull(channel, "channel");
1✔
485
    ChannelHandlerContext handlerCtx =
1✔
486
        channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
1✔
487
    if (handlerCtx == null) {
1✔
488
      return;
1✔
489
    }
490
    ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
1✔
491
  }
1✔
492

493
  @Override
494
  public Attributes getEagAttributes() {
495
    return eagAttributes;
1✔
496
  }
497

498
  @Override
499
  public String getAuthority() {
500
    return authority;
1✔
501
  }
502

503
  InternalChannelz.Security getSecurityInfo() {
504
    return securityInfo;
1✔
505
  }
506

507
  @Override
508
  protected void onConnectionError(ChannelHandlerContext ctx,  boolean outbound, Throwable cause,
509
      Http2Exception http2Ex) {
510
    logger.log(Level.FINE, "Caught a connection error", cause);
1✔
511
    lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
1✔
512
    // Parent class will shut down the Channel
513
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
514
  }
1✔
515

516
  @Override
517
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
518
      Http2Exception.StreamException http2Ex) {
519
    // Close the stream with a status that contains the cause.
520
    NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId()));
1✔
521
    if (stream != null) {
1✔
522
      stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata());
1✔
523
    } else {
524
      logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause);
1✔
525
    }
526

527
    // Delegate to the base class to send a RST_STREAM.
528
    super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
529
  }
1✔
530

531
  @Override
532
  protected boolean isGracefulShutdownComplete() {
533
    // Only allow graceful shutdown to complete after all pending streams have completed.
534
    return super.isGracefulShutdownComplete()
1✔
535
        && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
1✔
536
  }
537

538
  /**
539
   * Attempts to create a new stream from the given command. If there are too many active streams,
540
   * the creation request is queued.
541
   */
542
  private void createStream(CreateStreamCommand command, ChannelPromise promise)
543
          throws Exception {
544
    if (lifecycleManager.getShutdownThrowable() != null) {
1✔
545
      command.stream().setNonExistent();
1✔
546
      // The connection is going away (it is really the GOAWAY case),
547
      // just terminate the stream now.
548
      command.stream().transportReportStatus(
1✔
549
          lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
1✔
550
      promise.setFailure(lifecycleManager.getShutdownThrowable());
1✔
551
      return;
1✔
552
    }
553

554
    // Get the stream ID for the new stream.
555
    int streamId;
556
    try {
557
      streamId = incrementAndGetNextStreamId();
1✔
558
    } catch (StatusException e) {
1✔
559
      command.stream().setNonExistent();
1✔
560
      // Stream IDs have been exhausted for this connection. Fail the promise immediately.
561
      promise.setFailure(e);
1✔
562

563
      // Initiate a graceful shutdown if we haven't already.
564
      if (!connection().goAwaySent()) {
1✔
565
        logger.fine("Stream IDs have been exhausted for this connection. "
1✔
566
                + "Initiating graceful shutdown of the connection.");
567
        lifecycleManager.notifyShutdown(e.getStatus());
1✔
568
        close(ctx(), ctx().newPromise());
1✔
569
      }
570
      return;
1✔
571
    }
1✔
572
    if (connection().goAwayReceived()) {
1✔
573
      Status s = abruptGoAwayStatus;
1✔
574
      int maxActiveStreams = connection().local().maxActiveStreams();
1✔
575
      int lastStreamId = connection().local().lastStreamKnownByPeer();
1✔
576
      if (s == null) {
1✔
577
        // Should be impossible, but handle pseudo-gracefully
578
        s = Status.INTERNAL.withDescription(
×
579
            "Failed due to abrupt GOAWAY, but can't find GOAWAY details");
580
      } else if (streamId > lastStreamId) {
1✔
581
        s = s.augmentDescription(
1✔
582
            "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId);
583
      } else if (connection().local().numActiveStreams() == maxActiveStreams) {
1✔
584
        s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams);
1✔
585
      }
586
      if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) {
1✔
587
        // This should only be reachable during onGoAwayReceived, as otherwise
588
        // getShutdownThrowable() != null
589
        command.stream().setNonExistent();
1✔
590
        command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
1✔
591
        promise.setFailure(s.asRuntimeException());
1✔
592
        return;
1✔
593
      }
594
    }
595

596
    NettyClientStream.TransportState stream = command.stream();
1✔
597
    Http2Headers headers = command.headers();
1✔
598
    stream.setId(streamId);
1✔
599

600
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
1✔
601
      PerfMark.linkIn(command.getLink());
1✔
602
      PerfMark.attachTag(stream.tag());
1✔
603
      createStreamTraced(
1✔
604
          streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
1✔
605
    }
606
  }
1✔
607

608
  private void createStreamTraced(
609
      final int streamId,
610
      final NettyClientStream.TransportState stream,
611
      final Http2Headers headers,
612
      boolean isGet,
613
      final boolean shouldBeCountedForInUse,
614
      final ChannelPromise promise) {
615
    // Create an intermediate promise so that we can intercept the failure reported back to the
616
    // application.
617
    ChannelPromise tempPromise = ctx().newPromise();
1✔
618
    encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
1✔
619
        .addListener(new ChannelFutureListener() {
1✔
620
          @Override
621
          public void operationComplete(ChannelFuture future) throws Exception {
622
            if (future.isSuccess()) {
1✔
623
              // The http2Stream will be null in case a stream buffered in the encoder
624
              // was canceled via RST_STREAM.
625
              Http2Stream http2Stream = connection().stream(streamId);
1✔
626
              if (http2Stream != null) {
1✔
627
                stream.getStatsTraceContext().clientOutboundHeaders();
1✔
628
                http2Stream.setProperty(streamKey, stream);
1✔
629

630
                // This delays the in-use state until the I/O completes, which technically may
631
                // be later than we would like.
632
                if (shouldBeCountedForInUse) {
1✔
633
                  inUseState.updateObjectInUse(http2Stream, true);
1✔
634
                }
635

636
                // Attach the client stream to the HTTP/2 stream object as user data.
637
                stream.setHttp2Stream(http2Stream);
1✔
638
              }
639
              // Otherwise, the stream has been cancelled and Netty is sending a
640
              // RST_STREAM frame which causes it to purge pending writes from the
641
              // flow-controller and delete the http2Stream. The stream listener has already
642
              // been notified of cancellation so there is nothing to do.
643

644
              // Just forward on the success status to the original promise.
645
              promise.setSuccess();
1✔
646
            } else {
1✔
647
              Throwable cause = future.cause();
1✔
648
              if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
1✔
649
                StreamBufferingEncoder.Http2GoAwayException e =
1✔
650
                    (StreamBufferingEncoder.Http2GoAwayException) cause;
651
                Status status = statusFromH2Error(
1✔
652
                    Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
653
                    e.errorCode(), e.debugData());
1✔
654
                cause = status.asRuntimeException();
1✔
655
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
656
              } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
1✔
657
                Status status = lifecycleManager.getShutdownStatus();
1✔
658
                if (status == null) {
1✔
659
                  status = Status.UNAVAILABLE.withCause(cause)
×
660
                      .withDescription("Connection closed while stream is buffered");
×
661
                }
662
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
663
              }
664
              promise.setFailure(cause);
1✔
665
            }
666
          }
1✔
667
        });
668
  }
1✔
669

670
  /**
671
   * Cancels this stream.
672
   */
673
  private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
674
      ChannelPromise promise) {
675
    NettyClientStream.TransportState stream = cmd.stream();
1✔
676
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
1✔
677
      PerfMark.attachTag(stream.tag());
1✔
678
      PerfMark.linkIn(cmd.getLink());
1✔
679
      Status reason = cmd.reason();
1✔
680
      if (reason != null) {
1✔
681
        stream.transportReportStatus(reason, true, new Metadata());
1✔
682
      }
683
      if (!cmd.stream().isNonExistent()) {
1✔
684
        encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
1✔
685
      } else {
686
        promise.setSuccess();
1✔
687
      }
688
    }
689
  }
1✔
690

691
  /**
692
   * Sends the given GRPC frame for the stream.
693
   */
694
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
695
      ChannelPromise promise) {
696
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
1✔
697
      PerfMark.attachTag(cmd.stream().tag());
1✔
698
      PerfMark.linkIn(cmd.getLink());
1✔
699
      // Call the base class to write the HTTP/2 DATA frame.
700
      // Note: no need to flush since this is handled by the outbound flow controller.
701
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
702
    }
703
  }
1✔
704

705
  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
706
      ChannelPromise promise) {
707
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
1✔
708
      PerfMark.linkIn(msg.getLink());
1✔
709
      sendPingFrameTraced(ctx, msg, promise);
1✔
710
    }
711
  }
1✔
712

713
  /**
714
   * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
715
   * registered to be called when the existing operation completes, and no new frame is sent.
716
   */
717
  private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
718
      ChannelPromise promise) {
719
    // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
720
    // but before termination. After termination, messages will no longer arrive because the
721
    // pipeline clears all handlers on channel close.
722

723
    PingCallback callback = msg.callback();
1✔
724
    Executor executor = msg.executor();
1✔
725
    // we only allow one outstanding ping at a time, so just add the callback to
726
    // any outstanding operation
727
    if (ping != null) {
1✔
728
      promise.setSuccess();
1✔
729
      ping.addCallback(callback, executor);
1✔
730
      return;
1✔
731
    }
732

733
    // Use a new promise to prevent calling the callback twice on write failure: here and in
734
    // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if
735
    // ping != null above.
736
    promise.setSuccess();
1✔
737
    promise = ctx().newPromise();
1✔
738
    // set outstanding operation
739
    long data = USER_PING_PAYLOAD;
1✔
740
    Stopwatch stopwatch = stopwatchFactory.get();
1✔
741
    stopwatch.start();
1✔
742
    ping = new Http2Ping(data, stopwatch);
1✔
743
    ping.addCallback(callback, executor);
1✔
744
    // and then write the ping
745
    encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise);
1✔
746
    ctx.flush();
1✔
747
    final Http2Ping finalPing = ping;
1✔
748
    promise.addListener(new ChannelFutureListener() {
1✔
749
      @Override
750
      public void operationComplete(ChannelFuture future) throws Exception {
751
        if (future.isSuccess()) {
1✔
752
          transportTracer.reportKeepAliveSent();
1✔
753
        } else {
754
          Throwable cause = future.cause();
×
755
          if (cause instanceof ClosedChannelException) {
×
756
            cause = lifecycleManager.getShutdownThrowable();
×
757
            if (cause == null) {
×
758
              cause = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
×
759
                  .withCause(future.cause()).asException();
×
760
            }
761
          }
762
          finalPing.failed(cause);
×
763
          if (ping == finalPing) {
×
764
            ping = null;
×
765
          }
766
        }
767
      }
1✔
768
    });
769
  }
1✔
770

771
  private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
772
      ChannelPromise promise) throws Exception {
773
    lifecycleManager.notifyShutdown(msg.getStatus());
1✔
774
    // Explicitly flush to create any buffered streams before sending GOAWAY.
775
    // TODO(ejona): determine if the need to flush is a bug in Netty
776
    flush(ctx);
1✔
777
    close(ctx, promise);
1✔
778
  }
1✔
779

780
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
781
      ChannelPromise promise) throws Exception {
782
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
783
      @Override
784
      public boolean visit(Http2Stream stream) throws Http2Exception {
785
        NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
786
        Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
1✔
787
        try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
1✔
788
          PerfMark.linkIn(msg.getLink());
1✔
789
          PerfMark.attachTag(tag);
1✔
790
          if (clientStream != null) {
1✔
791
            clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
1✔
792
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
793
          }
794
          stream.close();
1✔
795
          return true;
1✔
796
        }
797
      }
798
    });
799
    close(ctx, promise);
1✔
800
  }
1✔
801

802
  /**
803
   * Handler for a GOAWAY being received. Fails any streams created after the
804
   * last known stream. May only be called during a read.
805
   */
806
  private void goingAway(long errorCode, byte[] debugData) {
807
    Status finalStatus = statusFromH2Error(
1✔
808
        Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
809
    lifecycleManager.notifyGracefulShutdown(finalStatus);
1✔
810
    abruptGoAwayStatus = statusFromH2Error(
1✔
811
        Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
812
    // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
813
    // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
814
    // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
815
    // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
816
    // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
817
    // UNAVAILABLE. https://github.com/netty/netty/issues/10670
818
    final Status abruptGoAwayStatusConservative = statusFromH2Error(
1✔
819
        null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
820
    final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code();
1✔
821
    // Try to allocate as many in-flight streams as possible, to reduce race window of
822
    // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
823
    // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
824
    // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
825
    // processed and thus this processing must be in-line before processing additional reads.
826

827
    // This can cause reentrancy, but should be minor since it is normal to handle writes in
828
    // response to a read. Also, the call stack is rather shallow at this point
829
    clientWriteQueue.drainNow();
1✔
830
    if (lifecycleManager.notifyShutdown(finalStatus)) {
1✔
831
      // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
832
      // RPCs were not observed by the remote and so should be UNAVAILABLE.
833
      channelInactiveReason = statusFromH2Error(
1✔
834
          null, "Connection closed after GOAWAY", errorCode, debugData);
835
    }
836

837
    final int lastKnownStream = connection().local().lastStreamKnownByPeer();
1✔
838
    try {
839
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
840
        @Override
841
        public boolean visit(Http2Stream stream) throws Http2Exception {
842
          if (stream.id() > lastKnownStream) {
1✔
843
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
844
            if (clientStream != null) {
1✔
845
              // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
846
              // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
847
              // retries, but only if something else caused a connection failure.
848
              RpcProgress progress = mayBeHittingNettyBug
1✔
849
                  ? RpcProgress.PROCESSED
1✔
850
                  : RpcProgress.REFUSED;
1✔
851
              clientStream.transportReportStatus(
1✔
852
                  abruptGoAwayStatusConservative, progress, false, new Metadata());
853
            }
854
            stream.close();
1✔
855
          }
856
          return true;
1✔
857
        }
858
      });
859
    } catch (Http2Exception e) {
×
860
      throw new RuntimeException(e);
×
861
    }
1✔
862
  }
1✔
863

864
  private void cancelPing(Throwable t) {
865
    if (ping != null) {
1✔
866
      ping.failed(t);
1✔
867
      ping = null;
1✔
868
    }
869
  }
1✔
870

871
  /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
872
  private Status statusFromH2Error(
873
      Status.Code statusCode, String context, long errorCode, byte[] debugData) {
874
    Status status = GrpcUtil.Http2Error.statusForCode(errorCode);
1✔
875
    if (statusCode == null) {
1✔
876
      statusCode = status.getCode();
1✔
877
    }
878
    String debugString = "";
1✔
879
    if (debugData != null && debugData.length > 0) {
1✔
880
      // If a debug message was provided, use it.
881
      debugString = ", debug data: " + new String(debugData, UTF_8);
1✔
882
    }
883
    return statusCode.toStatus()
1✔
884
        .withDescription(context + ". " + status.getDescription() + debugString);
1✔
885
  }
886

887
  /**
888
   * Gets the client stream associated to the given HTTP/2 stream object.
889
   */
890
  private NettyClientStream.TransportState clientStream(Http2Stream stream) {
891
    return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey);
1✔
892
  }
893

894
  private int incrementAndGetNextStreamId() throws StatusException {
895
    int nextStreamId = connection().local().incrementAndGetNextStreamId();
1✔
896
    if (nextStreamId < 0) {
1✔
897
      logger.fine("Stream IDs have been exhausted for this connection. "
1✔
898
              + "Initiating graceful shutdown of the connection.");
899
      throw EXHAUSTED_STREAMS_STATUS.asException();
1✔
900
    }
901
    return nextStreamId;
1✔
902
  }
903

904
  private Http2Stream requireHttp2Stream(int streamId) {
905
    Http2Stream stream = connection().stream(streamId);
1✔
906
    if (stream == null) {
1✔
907
      // This should never happen.
908
      throw new AssertionError("Stream does not exist: " + streamId);
×
909
    }
910
    return stream;
1✔
911
  }
912

913
  private class FrameListener extends Http2FrameAdapter {
1✔
914
    private boolean firstSettings = true;
1✔
915

916
    @Override
917
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
918
      if (firstSettings) {
1✔
919
        firstSettings = false;
1✔
920
        lifecycleManager.notifyReady();
1✔
921
      }
922
    }
1✔
923

924
    @Override
925
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
926
        boolean endOfStream) throws Http2Exception {
927
      NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
928
      return padding;
1✔
929
    }
930

931
    @Override
932
    public void onHeadersRead(ChannelHandlerContext ctx,
933
        int streamId,
934
        Http2Headers headers,
935
        int streamDependency,
936
        short weight,
937
        boolean exclusive,
938
        int padding,
939
        boolean endStream) throws Http2Exception {
940
      NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
1✔
941
    }
1✔
942

943
    @Override
944
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
945
        throws Http2Exception {
946
      NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
1✔
947
    }
1✔
948

949
    @Override
950
    public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception {
951
      Http2Ping p = ping;
1✔
952
      if (ackPayload == flowControlPing().payload()) {
1✔
953
        flowControlPing().updateWindow();
1✔
954
        logger.log(Level.FINE, "Window: {0}",
1✔
955
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
956
      } else if (p != null) {
1✔
957
        if (p.payload() == ackPayload) {
1✔
958
          p.complete();
1✔
959
          ping = null;
1✔
960
        } else {
961
          logger.log(Level.WARNING,
1✔
962
              "Received unexpected ping ack. Expecting {0}, got {1}",
963
              new Object[] {p.payload(), ackPayload});
1✔
964
        }
965
      } else {
966
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
967
      }
968
      if (keepAliveManager != null) {
1✔
969
        keepAliveManager.onDataReceived();
1✔
970
      }
971
    }
1✔
972

973
    @Override
974
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
975
      if (keepAliveManager != null) {
1✔
976
        keepAliveManager.onDataReceived();
×
977
      }
978
    }
1✔
979
  }
980

981
  private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
982
      implements AbstractNettyHandler.PingLimiter {
983
    private int pingCount;
984

985
    public PingCountingFrameWriter(Http2FrameWriter delegate) {
986
      super(delegate);
1✔
987
    }
1✔
988

989
    @Override
990
    public boolean isPingAllowed() {
991
      // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
992
      return pingCount < 2;
1✔
993
    }
994

995
    @Override
996
    public ChannelFuture writeHeaders(
997
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
998
        int padding, boolean endStream, ChannelPromise promise) {
999
      pingCount = 0;
×
1000
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
×
1001
    }
1002

1003
    @Override
1004
    public ChannelFuture writeHeaders(
1005
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1006
        int streamDependency, short weight, boolean exclusive,
1007
        int padding, boolean endStream, ChannelPromise promise) {
1008
      pingCount = 0;
1✔
1009
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
1✔
1010
          padding, endStream, promise);
1011
    }
1012

1013
    @Override
1014
    public ChannelFuture writeWindowUpdate(
1015
        ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
1016
      pingCount = 0;
1✔
1017
      return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
1✔
1018
    }
1019

1020
    @Override
1021
    public ChannelFuture writePing(
1022
        ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
1023
      if (!ack) {
1✔
1024
        pingCount++;
1✔
1025
      }
1026
      return super.writePing(ctx, ack, data, promise);
1✔
1027
    }
1028

1029
    @Override
1030
    public ChannelFuture writeData(
1031
        ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
1032
        ChannelPromise promise) {
1033
      if (data.isReadable()) {
1✔
1034
        pingCount = 0;
1✔
1035
      }
1036
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1037
    }
1038
  }
1039
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc