• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19913

17 Jul 2025 09:55PM UTC coverage: 88.569%. Remained the same
#19913

push

github

ejona86
Guarantee missing stream promise delivery

In observed cases, whether RST_STREAM or another failure from netty or
the server, listeners can fail to be notified when a connection yields a
null stream for the selected streamId. This causes hangs in clients,
despite deadlines, with no obvious resolution.

Tests which relied upon this promise succeeding must now change.

34674 of 39149 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.51
/../netty/src/main/java/io/grpc/netty/NettyClientHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
20
import static io.netty.util.CharsetUtil.UTF_8;
21
import static io.netty.util.internal.ObjectUtil.checkNotNull;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Preconditions;
25
import com.google.common.base.Stopwatch;
26
import com.google.common.base.Supplier;
27
import com.google.common.base.Ticker;
28
import io.grpc.Attributes;
29
import io.grpc.ChannelLogger;
30
import io.grpc.InternalChannelz;
31
import io.grpc.InternalStatus;
32
import io.grpc.Metadata;
33
import io.grpc.Status;
34
import io.grpc.StatusException;
35
import io.grpc.internal.ClientStreamListener.RpcProgress;
36
import io.grpc.internal.ClientTransport.PingCallback;
37
import io.grpc.internal.GrpcAttributes;
38
import io.grpc.internal.GrpcUtil;
39
import io.grpc.internal.Http2Ping;
40
import io.grpc.internal.InUseStateAggregator;
41
import io.grpc.internal.KeepAliveManager;
42
import io.grpc.internal.TransportTracer;
43
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
44
import io.netty.buffer.ByteBuf;
45
import io.netty.buffer.ByteBufUtil;
46
import io.netty.buffer.Unpooled;
47
import io.netty.channel.Channel;
48
import io.netty.channel.ChannelFuture;
49
import io.netty.channel.ChannelFutureListener;
50
import io.netty.channel.ChannelHandlerContext;
51
import io.netty.channel.ChannelPromise;
52
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
53
import io.netty.handler.codec.http2.DefaultHttp2Connection;
54
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
55
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
56
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
57
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
58
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
59
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
60
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
61
import io.netty.handler.codec.http2.Http2CodecUtil;
62
import io.netty.handler.codec.http2.Http2Connection;
63
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
64
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
65
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
66
import io.netty.handler.codec.http2.Http2Error;
67
import io.netty.handler.codec.http2.Http2Exception;
68
import io.netty.handler.codec.http2.Http2FrameAdapter;
69
import io.netty.handler.codec.http2.Http2FrameLogger;
70
import io.netty.handler.codec.http2.Http2FrameReader;
71
import io.netty.handler.codec.http2.Http2FrameWriter;
72
import io.netty.handler.codec.http2.Http2Headers;
73
import io.netty.handler.codec.http2.Http2HeadersDecoder;
74
import io.netty.handler.codec.http2.Http2HeadersEncoder;
75
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
76
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
77
import io.netty.handler.codec.http2.Http2Settings;
78
import io.netty.handler.codec.http2.Http2Stream;
79
import io.netty.handler.codec.http2.Http2StreamVisitor;
80
import io.netty.handler.codec.http2.StreamBufferingEncoder;
81
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
82
import io.netty.handler.logging.LogLevel;
83
import io.perfmark.PerfMark;
84
import io.perfmark.Tag;
85
import io.perfmark.TaskCloseable;
86
import java.nio.channels.ClosedChannelException;
87
import java.util.LinkedHashMap;
88
import java.util.Map;
89
import java.util.concurrent.Executor;
90
import java.util.logging.Level;
91
import java.util.logging.Logger;
92
import javax.annotation.Nullable;
93

94
/**
95
 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
96
 * the context of the Netty Channel thread.
97
 */
98
class NettyClientHandler extends AbstractNettyHandler {
99
  private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
1✔
100
  static boolean enablePerRpcAuthorityCheck =
1✔
101
      GrpcUtil.getFlag("GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK", false);
1✔
102

103
  /**
104
   * A message that simply passes through the channel without any real processing. It is useful to
105
   * check if buffers have been drained and test the health of the channel in a single operation.
106
   */
107
  static final Object NOOP_MESSAGE = new Object();
1✔
108

109
  /**
110
   * Status used when the transport has exhausted the number of streams.
111
   */
112
  private static final Status EXHAUSTED_STREAMS_STATUS =
1✔
113
          Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");
1✔
114
  private static final long USER_PING_PAYLOAD = 1111;
115

116
  private final Http2Connection.PropertyKey streamKey;
117
  private final ClientTransportLifecycleManager lifecycleManager;
118
  private final KeepAliveManager keepAliveManager;
119
  // Returns new unstarted stopwatches
120
  private final Supplier<Stopwatch> stopwatchFactory;
121
  private final TransportTracer transportTracer;
122
  private final Attributes eagAttributes;
123
  private final String authority;
124
  private final InUseStateAggregator<Http2Stream> inUseState =
1✔
125
      new InUseStateAggregator<Http2Stream>() {
1✔
126
        @Override
127
        protected void handleInUse() {
128
          lifecycleManager.notifyInUse(true);
1✔
129
        }
1✔
130

131
        @Override
132
        protected void handleNotInUse() {
133
          lifecycleManager.notifyInUse(false);
1✔
134
        }
1✔
135
      };
136
  private final Map<String, Status> peerVerificationResults =
1✔
137
      new LinkedHashMap<String, Status>() {
1✔
138
        @Override
139
        protected boolean removeEldestEntry(Map.Entry<String, Status> eldest) {
140
          return size() > 100;
1✔
141
        }
142
      };
143

144
  private WriteQueue clientWriteQueue;
145
  private Http2Ping ping;
146
  private Attributes attributes;
147
  private InternalChannelz.Security securityInfo;
148
  private Status abruptGoAwayStatus;
149
  private Status channelInactiveReason;
150

151
  static NettyClientHandler newHandler(
152
      ClientTransportLifecycleManager lifecycleManager,
153
      @Nullable KeepAliveManager keepAliveManager,
154
      boolean autoFlowControl,
155
      int flowControlWindow,
156
      int maxHeaderListSize,
157
      int softLimitHeaderListSize,
158
      Supplier<Stopwatch> stopwatchFactory,
159
      Runnable tooManyPingsRunnable,
160
      TransportTracer transportTracer,
161
      Attributes eagAttributes,
162
      String authority,
163
      ChannelLogger negotiationLogger,
164
      Ticker ticker) {
165
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
166
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
1✔
167
    Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
1✔
168
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
169
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
170
    Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(encoder);
1✔
171
    Http2Connection connection = new DefaultHttp2Connection(false);
1✔
172
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
173
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
174
    DefaultHttp2RemoteFlowController controller =
1✔
175
        new DefaultHttp2RemoteFlowController(connection, dist);
176
    connection.remote().flowController(controller);
1✔
177

178
    return newHandler(
1✔
179
        connection,
180
        frameReader,
181
        frameWriter,
182
        lifecycleManager,
183
        keepAliveManager,
184
        autoFlowControl,
185
        flowControlWindow,
186
        maxHeaderListSize,
187
        softLimitHeaderListSize,
188
        stopwatchFactory,
189
        tooManyPingsRunnable,
190
        transportTracer,
191
        eagAttributes,
192
        authority,
193
        negotiationLogger,
194
        ticker);
195
  }
196

197
  @VisibleForTesting
198
  static NettyClientHandler newHandler(
199
      final Http2Connection connection,
200
      Http2FrameReader frameReader,
201
      Http2FrameWriter frameWriter,
202
      ClientTransportLifecycleManager lifecycleManager,
203
      KeepAliveManager keepAliveManager,
204
      boolean autoFlowControl,
205
      int flowControlWindow,
206
      int maxHeaderListSize,
207
      int softLimitHeaderListSize,
208
      Supplier<Stopwatch> stopwatchFactory,
209
      Runnable tooManyPingsRunnable,
210
      TransportTracer transportTracer,
211
      Attributes eagAttributes,
212
      String authority,
213
      ChannelLogger negotiationLogger,
214
      Ticker ticker) {
215
    Preconditions.checkNotNull(connection, "connection");
1✔
216
    Preconditions.checkNotNull(frameReader, "frameReader");
1✔
217
    Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
1✔
218
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
1✔
219
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
1✔
220
    Preconditions.checkArgument(softLimitHeaderListSize > 0,
1✔
221
        "softLimitHeaderListSize must be positive");
222
    Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
223
    Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
224
    Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
225
    Preconditions.checkNotNull(authority, "authority");
1✔
226

227
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
1✔
228
    frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
1✔
229
    frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
1✔
230

231
    PingCountingFrameWriter pingCounter;
232
    frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
1✔
233

234
    StreamBufferingEncoder encoder =
1✔
235
        new StreamBufferingEncoder(
236
            new DefaultHttp2ConnectionEncoder(connection, frameWriter));
237

238
    // Create the local flow controller configured to auto-refill the connection window.
239
    connection.local().flowController(
1✔
240
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
241

242
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
243
        frameReader);
244

245
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection));
1✔
246

247
    Http2Settings settings = new Http2Settings();
1✔
248
    settings.pushEnabled(false);
1✔
249
    settings.initialWindowSize(flowControlWindow);
1✔
250
    settings.maxConcurrentStreams(0);
1✔
251
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
252

253
    return new NettyClientHandler(
1✔
254
        decoder,
255
        encoder,
256
        settings,
257
        negotiationLogger,
258
        lifecycleManager,
259
        keepAliveManager,
260
        stopwatchFactory,
261
        tooManyPingsRunnable,
262
        transportTracer,
263
        eagAttributes,
264
        authority,
265
        autoFlowControl,
266
        pingCounter,
267
        ticker,
268
        maxHeaderListSize,
269
        softLimitHeaderListSize);
270
  }
271

272
  private NettyClientHandler(
273
      Http2ConnectionDecoder decoder,
274
      Http2ConnectionEncoder encoder,
275
      Http2Settings settings,
276
      ChannelLogger negotiationLogger,
277
      ClientTransportLifecycleManager lifecycleManager,
278
      KeepAliveManager keepAliveManager,
279
      Supplier<Stopwatch> stopwatchFactory,
280
      final Runnable tooManyPingsRunnable,
281
      TransportTracer transportTracer,
282
      Attributes eagAttributes,
283
      String authority,
284
      boolean autoFlowControl,
285
      PingLimiter pingLimiter,
286
      Ticker ticker,
287
      int maxHeaderListSize,
288
      int softLimitHeaderListSize) {
289
    super(
1✔
290
        /* channelUnused= */ null,
291
        decoder,
292
        encoder,
293
        settings,
294
        negotiationLogger,
295
        autoFlowControl,
296
        pingLimiter,
297
        ticker,
298
        maxHeaderListSize,
299
        softLimitHeaderListSize);
300
    this.lifecycleManager = lifecycleManager;
1✔
301
    this.keepAliveManager = keepAliveManager;
1✔
302
    this.stopwatchFactory = stopwatchFactory;
1✔
303
    this.transportTracer = Preconditions.checkNotNull(transportTracer);
1✔
304
    this.eagAttributes = eagAttributes;
1✔
305
    this.authority = authority;
1✔
306
    this.attributes = Attributes.newBuilder()
1✔
307
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
1✔
308

309
    // Set the frame listener on the decoder.
310
    decoder().frameListener(new FrameListener());
1✔
311

312
    Http2Connection connection = encoder.connection();
1✔
313
    streamKey = connection.newKey();
1✔
314

315
    connection.addListener(new Http2ConnectionAdapter() {
1✔
316
      @Override
317
      public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
318
        byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
1✔
319
        goingAway(errorCode, debugDataBytes);
1✔
320
        if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
1✔
321
          String data = new String(debugDataBytes, UTF_8);
1✔
322
          logger.log(
1✔
323
              Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data);
324
          if ("too_many_pings".equals(data)) {
1✔
325
            tooManyPingsRunnable.run();
1✔
326
          }
327
        }
328
      }
1✔
329

330
      @Override
331
      public void onStreamActive(Http2Stream stream) {
332
        if (connection().numActiveStreams() == 1
1✔
333
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
334
          NettyClientHandler.this.keepAliveManager.onTransportActive();
1✔
335
        }
336
      }
1✔
337

338
      @Override
339
      public void onStreamClosed(Http2Stream stream) {
340
        // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
341
        // the first place, we don't propagate that option here, and it's safe to reset the in-use
342
        // state for them, which will be a cheap no-op.
343
        inUseState.updateObjectInUse(stream, false);
1✔
344
        if (connection().numActiveStreams() == 0
1✔
345
            && NettyClientHandler.this.keepAliveManager != null) {
1✔
346
          NettyClientHandler.this.keepAliveManager.onTransportIdle();
1✔
347
        }
348
      }
1✔
349
    });
350
  }
1✔
351

352
  /**
353
   * The protocol negotiation attributes, available once the protocol negotiation completes;
354
   * otherwise returns {@code Attributes.EMPTY}.
355
   */
356
  Attributes getAttributes() {
357
    return attributes;
1✔
358
  }
359

360
  /**
361
   * Handler for commands sent from the stream.
362
   */
363
  @Override
364
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
365
          throws Exception {
366
    if (msg instanceof CreateStreamCommand) {
1✔
367
      createStream((CreateStreamCommand) msg, promise);
1✔
368
    } else if (msg instanceof SendGrpcFrameCommand) {
1✔
369
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
370
    } else if (msg instanceof CancelClientStreamCommand) {
1✔
371
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
1✔
372
    } else if (msg instanceof SendPingCommand) {
1✔
373
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
1✔
374
    } else if (msg instanceof GracefulCloseCommand) {
1✔
375
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
1✔
376
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
377
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
378
    } else if (msg == NOOP_MESSAGE) {
1✔
379
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
1✔
380
    } else {
381
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
382
    }
383
  }
1✔
384

385
  void startWriteQueue(Channel channel) {
386
    clientWriteQueue = new WriteQueue(channel);
1✔
387
  }
1✔
388

389
  WriteQueue getWriteQueue() {
390
    return clientWriteQueue;
1✔
391
  }
392

393
  ClientTransportLifecycleManager getLifecycleManager() {
394
    return lifecycleManager;
1✔
395
  }
396

397
  /**
398
   * Returns the given processed bytes back to inbound flow control.
399
   */
400
  void returnProcessedBytes(Http2Stream stream, int bytes) {
401
    try {
402
      decoder().flowController().consumeBytes(stream, bytes);
1✔
403
    } catch (Http2Exception e) {
×
404
      throw new RuntimeException(e);
×
405
    }
1✔
406
  }
1✔
407

408
  private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
409
    // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
410
    if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
1✔
411
      NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
412
      PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
1✔
413
      // check metadata size vs soft limit
414
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
415
      boolean shouldFail =
1✔
416
          Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
417
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize);
418
      if (shouldFail && endStream) {
1✔
419
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
×
420
            .withDescription(
×
421
                String.format(
×
422
                    "Server Status + Trailers of size %d exceeded Metadata size soft limit: %d",
423
                    h2HeadersSize,
×
424
                    softLimitHeaderListSize)), true, new Metadata());
×
425
        return;
×
426
      } else if (shouldFail) {
1✔
427
        stream.transportReportStatus(Status.RESOURCE_EXHAUSTED
1✔
428
            .withDescription(
1✔
429
                String.format(
1✔
430
                    "Server Headers of size %d exceeded Metadata size soft limit: %d",
431
                    h2HeadersSize,
1✔
432
                    softLimitHeaderListSize)), true, new Metadata());
1✔
433
        return;
1✔
434
      }
435
      stream.transportHeadersReceived(headers, endStream);
1✔
436
    }
437

438
    if (keepAliveManager != null) {
1✔
439
      keepAliveManager.onDataReceived();
1✔
440
    }
441
  }
1✔
442

443
  /**
444
   * Handler for an inbound HTTP/2 DATA frame.
445
   */
446
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
447
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
448
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
1✔
449
    PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
1✔
450
    stream.transportDataReceived(data, endOfStream);
1✔
451
    if (keepAliveManager != null) {
1✔
452
      keepAliveManager.onDataReceived();
1✔
453
    }
454
  }
1✔
455

456
  /**
457
   * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
458
   */
459
  private void onRstStreamRead(int streamId, long errorCode) {
460
    NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
1✔
461
    if (stream != null) {
1✔
462
      PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
1✔
463
      Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
1✔
464
      stream.transportReportStatus(
1✔
465
          status,
466
          errorCode == Http2Error.REFUSED_STREAM.code()
1✔
467
              ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
468
          false /*stop delivery*/,
469
          new Metadata());
470
      if (keepAliveManager != null) {
1✔
471
        keepAliveManager.onDataReceived();
1✔
472
      }
473
    }
474
  }
1✔
475

476
  @Override
477
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
478
    logger.fine("Network channel being closed by the application.");
1✔
479
    if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
1✔
480
      lifecycleManager.notifyShutdown(
1✔
481
          Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"));
1✔
482
    }
483
    super.close(ctx, promise);
1✔
484
  }
1✔
485

486
  /**
487
   * Handler for the Channel shutting down.
488
   */
489
  @Override
490
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
491
    try {
492
      logger.fine("Network channel is closed");
1✔
493
      Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
1✔
494
      lifecycleManager.notifyShutdown(status);
1✔
495
      final Status streamStatus;
496
      if (channelInactiveReason != null) {
1✔
497
        streamStatus = channelInactiveReason;
1✔
498
      } else {
499
        streamStatus = lifecycleManager.getShutdownStatus();
1✔
500
      }
501
      try {
502
        cancelPing(lifecycleManager.getShutdownStatus());
1✔
503
        // Report status to the application layer for any open streams
504
        connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
505
          @Override
506
          public boolean visit(Http2Stream stream) throws Http2Exception {
507
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
508
            if (clientStream != null) {
1✔
509
              clientStream.transportReportStatus(streamStatus, false, new Metadata());
1✔
510
            }
511
            return true;
1✔
512
          }
513
        });
514
      } finally {
515
        lifecycleManager.notifyTerminated(status);
1✔
516
      }
517
    } finally {
518
      // Close any open streams
519
      super.channelInactive(ctx);
1✔
520
      if (keepAliveManager != null) {
1✔
521
        keepAliveManager.onTransportTermination();
1✔
522
      }
523
    }
524
  }
1✔
525

526
  @Override
527
  public void handleProtocolNegotiationCompleted(
528
      Attributes attributes, InternalChannelz.Security securityInfo) {
529
    this.attributes = this.attributes.toBuilder().setAll(attributes).build();
1✔
530
    this.securityInfo = securityInfo;
1✔
531
    super.handleProtocolNegotiationCompleted(attributes, securityInfo);
1✔
532
    writeBufferingAndRemove(ctx().channel());
1✔
533
  }
1✔
534

535
  static void writeBufferingAndRemove(Channel channel) {
536
    checkNotNull(channel, "channel");
1✔
537
    ChannelHandlerContext handlerCtx =
1✔
538
        channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
1✔
539
    if (handlerCtx == null) {
1✔
540
      return;
1✔
541
    }
542
    ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
1✔
543
  }
1✔
544

545
  @Override
546
  public Attributes getEagAttributes() {
547
    return eagAttributes;
1✔
548
  }
549

550
  @Override
551
  public String getAuthority() {
552
    return authority;
1✔
553
  }
554

555
  InternalChannelz.Security getSecurityInfo() {
556
    return securityInfo;
1✔
557
  }
558

559
  @Override
560
  protected void onConnectionError(ChannelHandlerContext ctx,  boolean outbound, Throwable cause,
561
      Http2Exception http2Ex) {
562
    logger.log(Level.FINE, "Caught a connection error", cause);
1✔
563
    lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
1✔
564
    // Parent class will shut down the Channel
565
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
566
  }
1✔
567

568
  @Override
569
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
570
      Http2Exception.StreamException http2Ex) {
571
    // Close the stream with a status that contains the cause.
572
    NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId()));
1✔
573
    if (stream != null) {
1✔
574
      stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata());
1✔
575
    } else {
576
      logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause);
1✔
577
    }
578

579
    // Delegate to the base class to send a RST_STREAM.
580
    super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
581
  }
1✔
582

583
  @Override
584
  protected boolean isGracefulShutdownComplete() {
585
    // Only allow graceful shutdown to complete after all pending streams have completed.
586
    return super.isGracefulShutdownComplete()
1✔
587
        && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
1✔
588
  }
589

590
  /**
591
   * Attempts to create a new stream from the given command. If there are too many active streams,
592
   * the creation request is queued.
593
   */
594
  private void createStream(CreateStreamCommand command, ChannelPromise promise)
595
          throws Exception {
596
    if (lifecycleManager.getShutdownStatus() != null) {
1✔
597
      command.stream().setNonExistent();
1✔
598
      // The connection is going away (it is really the GOAWAY case),
599
      // just terminate the stream now.
600
      command.stream().transportReportStatus(
1✔
601
          lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
1✔
602
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
603
              lifecycleManager.getShutdownStatus(), null));
1✔
604
      return;
1✔
605
    }
606

607
    CharSequence authorityHeader = command.headers().authority();
1✔
608
    if (authorityHeader == null) {
1✔
609
      Status authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
610
              "Missing authority header");
611
      command.stream().setNonExistent();
1✔
612
      command.stream().transportReportStatus(
1✔
613
              Status.UNAVAILABLE, RpcProgress.PROCESSED, true, new Metadata());
614
      promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
615
              authorityVerificationStatus, null));
616
      return;
1✔
617
    }
618
    // No need to verify authority for the rpc outgoing header if it is same as the authority
619
    // for the transport
620
    if (!authority.contentEquals(authorityHeader)) {
1✔
621
      Status authorityVerificationStatus = peerVerificationResults.get(
1✔
622
              authorityHeader.toString());
1✔
623
      if (authorityVerificationStatus == null) {
1✔
624
        if (attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER) == null) {
1✔
625
          authorityVerificationStatus = Status.UNAVAILABLE.withDescription(
1✔
626
                  "Authority verifier not found to verify authority");
627
          command.stream().setNonExistent();
1✔
628
          command.stream().transportReportStatus(
1✔
629
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
630
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
631
                  authorityVerificationStatus, null));
632
          return;
1✔
633
        }
634
        authorityVerificationStatus = attributes.get(GrpcAttributes.ATTR_AUTHORITY_VERIFIER)
1✔
635
                .verifyAuthority(authorityHeader.toString());
1✔
636
        peerVerificationResults.put(authorityHeader.toString(), authorityVerificationStatus);
1✔
637
        if (!authorityVerificationStatus.isOk() && !enablePerRpcAuthorityCheck) {
1✔
638
          logger.log(Level.WARNING, String.format("%s.%s",
1✔
639
                          authorityVerificationStatus.getDescription(),
1✔
640
                          enablePerRpcAuthorityCheck
1✔
641
                                  ? "" : " This will be an error in the future."),
1✔
642
                  InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
643
                          authorityVerificationStatus, null));
644
        }
645
      }
646
      if (!authorityVerificationStatus.isOk()) {
1✔
647
        if (enablePerRpcAuthorityCheck) {
1✔
648
          command.stream().setNonExistent();
1✔
649
          command.stream().transportReportStatus(
1✔
650
                  authorityVerificationStatus, RpcProgress.PROCESSED, true, new Metadata());
651
          promise.setFailure(InternalStatus.asRuntimeExceptionWithoutStacktrace(
1✔
652
                  authorityVerificationStatus, null));
653
          return;
1✔
654
        }
655
      }
656
    }
657
    // Get the stream ID for the new stream.
658
    int streamId;
659
    try {
660
      streamId = incrementAndGetNextStreamId();
1✔
661
    } catch (StatusException e) {
1✔
662
      command.stream().setNonExistent();
1✔
663
      // Stream IDs have been exhausted for this connection. Fail the promise immediately.
664
      promise.setFailure(e);
1✔
665

666
      // Initiate a graceful shutdown if we haven't already.
667
      if (!connection().goAwaySent()) {
1✔
668
        logger.fine("Stream IDs have been exhausted for this connection. "
1✔
669
                + "Initiating graceful shutdown of the connection.");
670
        lifecycleManager.notifyShutdown(e.getStatus());
1✔
671
        close(ctx(), ctx().newPromise());
1✔
672
      }
673
      return;
1✔
674
    }
1✔
675
    if (connection().goAwayReceived()) {
1✔
676
      Status s = abruptGoAwayStatus;
1✔
677
      int maxActiveStreams = connection().local().maxActiveStreams();
1✔
678
      int lastStreamId = connection().local().lastStreamKnownByPeer();
1✔
679
      if (s == null) {
1✔
680
        // Should be impossible, but handle pseudo-gracefully
681
        s = Status.INTERNAL.withDescription(
×
682
            "Failed due to abrupt GOAWAY, but can't find GOAWAY details");
683
      } else if (streamId > lastStreamId) {
1✔
684
        s = s.augmentDescription(
1✔
685
            "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId);
686
      } else if (connection().local().numActiveStreams() == maxActiveStreams) {
1✔
687
        s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams);
1✔
688
      }
689
      if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) {
1✔
690
        // This should only be reachable during onGoAwayReceived, as otherwise
691
        // getShutdownThrowable() != null
692
        command.stream().setNonExistent();
1✔
693
        command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
1✔
694
        promise.setFailure(s.asRuntimeException());
1✔
695
        return;
1✔
696
      }
697
    }
698

699
    NettyClientStream.TransportState stream = command.stream();
1✔
700
    Http2Headers headers = command.headers();
1✔
701
    stream.setId(streamId);
1✔
702

703
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
1✔
704
      PerfMark.linkIn(command.getLink());
1✔
705
      PerfMark.attachTag(stream.tag());
1✔
706
      createStreamTraced(
1✔
707
          streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
1✔
708
    }
709
  }
1✔
710

711
  private void createStreamTraced(
712
      final int streamId,
713
      final NettyClientStream.TransportState stream,
714
      final Http2Headers headers,
715
      boolean isGet,
716
      final boolean shouldBeCountedForInUse,
717
      final ChannelPromise promise) {
718
    // Create an intermediate promise so that we can intercept the failure reported back to the
719
    // application.
720
    ChannelPromise tempPromise = ctx().newPromise();
1✔
721
    encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
1✔
722
        .addListener(new ChannelFutureListener() {
1✔
723
          @Override
724
          public void operationComplete(ChannelFuture future) throws Exception {
725
            if (future.isSuccess()) {
1✔
726
              // The http2Stream will be null in case a stream buffered in the encoder
727
              // was canceled via RST_STREAM.
728
              Http2Stream http2Stream = connection().stream(streamId);
1✔
729
              if (http2Stream != null) {
1✔
730
                stream.getStatsTraceContext().clientOutboundHeaders();
1✔
731
                http2Stream.setProperty(streamKey, stream);
1✔
732

733
                // This delays the in-use state until the I/O completes, which technically may
734
                // be later than we would like.
735
                if (shouldBeCountedForInUse) {
1✔
736
                  inUseState.updateObjectInUse(http2Stream, true);
1✔
737
                }
738

739
                // Attach the client stream to the HTTP/2 stream object as user data.
740
                stream.setHttp2Stream(http2Stream);
1✔
741
                promise.setSuccess();
1✔
742
              } else {
743
                // Otherwise, the stream has been cancelled and Netty is sending a
744
                // RST_STREAM frame which causes it to purge pending writes from the
745
                // flow-controller and delete the http2Stream. The stream listener has already
746
                // been notified of cancellation so there is nothing to do.
747
                //
748
                // This process has been observed to fail in some circumstances, leaving listeners
749
                // unanswered. Ensure that some exception has been delivered consistent with the
750
                // implied RST_STREAM result above.
751
                Status status = Status.INTERNAL.withDescription("unknown stream for connection");
1✔
752
                promise.setFailure(status.asRuntimeException());
1✔
753
              }
754
            } else {
1✔
755
              Throwable cause = future.cause();
1✔
756
              if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
1✔
757
                StreamBufferingEncoder.Http2GoAwayException e =
1✔
758
                    (StreamBufferingEncoder.Http2GoAwayException) cause;
759
                Status status = statusFromH2Error(
1✔
760
                    Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
761
                    e.errorCode(), e.debugData());
1✔
762
                cause = status.asRuntimeException();
1✔
763
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
764
              } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
1✔
765
                Status status = lifecycleManager.getShutdownStatus();
1✔
766
                if (status == null) {
1✔
767
                  status = Status.UNAVAILABLE.withCause(cause)
×
768
                      .withDescription("Connection closed while stream is buffered");
×
769
                }
770
                stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
1✔
771
              }
772
              promise.setFailure(cause);
1✔
773
            }
774
          }
1✔
775
        });
776
  }
1✔
777

778
  /**
779
   * Cancels this stream.
780
   */
781
  private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
782
      ChannelPromise promise) {
783
    NettyClientStream.TransportState stream = cmd.stream();
1✔
784
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
1✔
785
      PerfMark.attachTag(stream.tag());
1✔
786
      PerfMark.linkIn(cmd.getLink());
1✔
787
      Status reason = cmd.reason();
1✔
788
      if (reason != null) {
1✔
789
        stream.transportReportStatus(reason, true, new Metadata());
1✔
790
      }
791
      if (!cmd.stream().isNonExistent()) {
1✔
792
        encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
1✔
793
      } else {
794
        promise.setSuccess();
1✔
795
      }
796
    }
797
  }
1✔
798

799
  /**
800
   * Sends the given GRPC frame for the stream.
801
   */
802
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
803
      ChannelPromise promise) {
804
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
1✔
805
      PerfMark.attachTag(cmd.stream().tag());
1✔
806
      PerfMark.linkIn(cmd.getLink());
1✔
807
      // Call the base class to write the HTTP/2 DATA frame.
808
      // Note: no need to flush since this is handled by the outbound flow controller.
809
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
810
    }
811
  }
1✔
812

813
  private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
814
      ChannelPromise promise) {
815
    try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
1✔
816
      PerfMark.linkIn(msg.getLink());
1✔
817
      sendPingFrameTraced(ctx, msg, promise);
1✔
818
    }
819
  }
1✔
820

821
  /**
822
   * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
823
   * registered to be called when the existing operation completes, and no new frame is sent.
824
   */
825
  private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
826
      ChannelPromise promise) {
827
    // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
828
    // but before termination. After termination, messages will no longer arrive because the
829
    // pipeline clears all handlers on channel close.
830

831
    PingCallback callback = msg.callback();
1✔
832
    Executor executor = msg.executor();
1✔
833
    // we only allow one outstanding ping at a time, so just add the callback to
834
    // any outstanding operation
835
    if (ping != null) {
1✔
836
      promise.setSuccess();
1✔
837
      ping.addCallback(callback, executor);
1✔
838
      return;
1✔
839
    }
840

841
    // Use a new promise to prevent calling the callback twice on write failure: here and in
842
    // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if
843
    // ping != null above.
844
    promise.setSuccess();
1✔
845
    promise = ctx().newPromise();
1✔
846
    // set outstanding operation
847
    long data = USER_PING_PAYLOAD;
1✔
848
    Stopwatch stopwatch = stopwatchFactory.get();
1✔
849
    stopwatch.start();
1✔
850
    ping = new Http2Ping(data, stopwatch);
1✔
851
    ping.addCallback(callback, executor);
1✔
852
    // and then write the ping
853
    encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise);
1✔
854
    ctx.flush();
1✔
855
    final Http2Ping finalPing = ping;
1✔
856
    promise.addListener(new ChannelFutureListener() {
1✔
857
      @Override
858
      public void operationComplete(ChannelFuture future) throws Exception {
859
        if (future.isSuccess()) {
1✔
860
          transportTracer.reportKeepAliveSent();
1✔
861
          return;
1✔
862
        }
863
        Throwable cause = future.cause();
×
864
        Status status = lifecycleManager.getShutdownStatus();
×
865
        if (cause instanceof ClosedChannelException) {
×
866
          if (status == null) {
×
867
            status = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
×
868
                    .withCause(future.cause());
×
869
          }
870
        } else {
871
          status = Utils.statusFromThrowable(cause);
×
872
        }
873
        finalPing.failed(status);
×
874
        if (ping == finalPing) {
×
875
          ping = null;
×
876
        }
877
      }
×
878
    });
879
  }
1✔
880

881
  private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
882
      ChannelPromise promise) throws Exception {
883
    lifecycleManager.notifyShutdown(msg.getStatus());
1✔
884
    // Explicitly flush to create any buffered streams before sending GOAWAY.
885
    // TODO(ejona): determine if the need to flush is a bug in Netty
886
    flush(ctx);
1✔
887
    close(ctx, promise);
1✔
888
  }
1✔
889

890
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
891
      ChannelPromise promise) throws Exception {
892
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
893
      @Override
894
      public boolean visit(Http2Stream stream) throws Http2Exception {
895
        NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
896
        Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
1✔
897
        try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
1✔
898
          PerfMark.linkIn(msg.getLink());
1✔
899
          PerfMark.attachTag(tag);
1✔
900
          if (clientStream != null) {
1✔
901
            clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
1✔
902
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
903
          }
904
          stream.close();
1✔
905
          return true;
1✔
906
        }
907
      }
908
    });
909
    close(ctx, promise);
1✔
910
  }
1✔
911

912
  /**
913
   * Handler for a GOAWAY being received. Fails any streams created after the
914
   * last known stream. May only be called during a read.
915
   */
916
  private void goingAway(long errorCode, byte[] debugData) {
917
    Status finalStatus = statusFromH2Error(
1✔
918
        Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
919
    lifecycleManager.notifyGracefulShutdown(finalStatus);
1✔
920
    abruptGoAwayStatus = statusFromH2Error(
1✔
921
        Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
922
    // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
923
    // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
924
    // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
925
    // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
926
    // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
927
    // UNAVAILABLE. https://github.com/netty/netty/issues/10670
928
    final Status abruptGoAwayStatusConservative = statusFromH2Error(
1✔
929
        null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
930
    final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code();
1✔
931
    // Try to allocate as many in-flight streams as possible, to reduce race window of
932
    // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
933
    // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
934
    // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
935
    // processed and thus this processing must be in-line before processing additional reads.
936

937
    // This can cause reentrancy, but should be minor since it is normal to handle writes in
938
    // response to a read. Also, the call stack is rather shallow at this point
939
    clientWriteQueue.drainNow();
1✔
940
    if (lifecycleManager.notifyShutdown(finalStatus)) {
1✔
941
      // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
942
      // RPCs were not observed by the remote and so should be UNAVAILABLE.
943
      channelInactiveReason = statusFromH2Error(
1✔
944
          null, "Connection closed after GOAWAY", errorCode, debugData);
945
    }
946

947
    final int lastKnownStream = connection().local().lastStreamKnownByPeer();
1✔
948
    try {
949
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
950
        @Override
951
        public boolean visit(Http2Stream stream) throws Http2Exception {
952
          if (stream.id() > lastKnownStream) {
1✔
953
            NettyClientStream.TransportState clientStream = clientStream(stream);
1✔
954
            if (clientStream != null) {
1✔
955
              // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
956
              // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
957
              // retries, but only if something else caused a connection failure.
958
              RpcProgress progress = mayBeHittingNettyBug
1✔
959
                  ? RpcProgress.PROCESSED
1✔
960
                  : RpcProgress.REFUSED;
1✔
961
              clientStream.transportReportStatus(
1✔
962
                  abruptGoAwayStatusConservative, progress, false, new Metadata());
963
            }
964
            stream.close();
1✔
965
          }
966
          return true;
1✔
967
        }
968
      });
969
    } catch (Http2Exception e) {
×
970
      throw new RuntimeException(e);
×
971
    }
1✔
972
  }
1✔
973

974
  private void cancelPing(Status s) {
975
    if (ping != null) {
1✔
976
      ping.failed(s);
1✔
977
      ping = null;
1✔
978
    }
979
  }
1✔
980

981
  /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
982
  private Status statusFromH2Error(
983
      Status.Code statusCode, String context, long errorCode, byte[] debugData) {
984
    Status status = GrpcUtil.Http2Error.statusForCode(errorCode);
1✔
985
    if (statusCode == null) {
1✔
986
      statusCode = status.getCode();
1✔
987
    }
988
    String debugString = "";
1✔
989
    if (debugData != null && debugData.length > 0) {
1✔
990
      // If a debug message was provided, use it.
991
      debugString = ", debug data: " + new String(debugData, UTF_8);
1✔
992
    }
993
    return statusCode.toStatus()
1✔
994
        .withDescription(context + ". " + status.getDescription() + debugString);
1✔
995
  }
996

997
  /**
998
   * Gets the client stream associated to the given HTTP/2 stream object.
999
   */
1000
  private NettyClientStream.TransportState clientStream(Http2Stream stream) {
1001
    return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey);
1✔
1002
  }
1003

1004
  private int incrementAndGetNextStreamId() throws StatusException {
1005
    int nextStreamId = connection().local().incrementAndGetNextStreamId();
1✔
1006
    if (nextStreamId < 0) {
1✔
1007
      logger.fine("Stream IDs have been exhausted for this connection. "
1✔
1008
              + "Initiating graceful shutdown of the connection.");
1009
      throw EXHAUSTED_STREAMS_STATUS.asException();
1✔
1010
    }
1011
    return nextStreamId;
1✔
1012
  }
1013

1014
  private Http2Stream requireHttp2Stream(int streamId) {
1015
    Http2Stream stream = connection().stream(streamId);
1✔
1016
    if (stream == null) {
1✔
1017
      // This should never happen.
1018
      throw new AssertionError("Stream does not exist: " + streamId);
×
1019
    }
1020
    return stream;
1✔
1021
  }
1022

1023
  private class FrameListener extends Http2FrameAdapter {
1✔
1024
    private boolean firstSettings = true;
1✔
1025

1026
    @Override
1027
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
1028
      if (firstSettings) {
1✔
1029
        firstSettings = false;
1✔
1030
        attributes = lifecycleManager.filterAttributes(attributes);
1✔
1031
        lifecycleManager.notifyReady();
1✔
1032
      }
1033
    }
1✔
1034

1035
    @Override
1036
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
1037
        boolean endOfStream) throws Http2Exception {
1038
      NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
1039
      return padding;
1✔
1040
    }
1041

1042
    @Override
1043
    public void onHeadersRead(ChannelHandlerContext ctx,
1044
        int streamId,
1045
        Http2Headers headers,
1046
        int streamDependency,
1047
        short weight,
1048
        boolean exclusive,
1049
        int padding,
1050
        boolean endStream) throws Http2Exception {
1051
      NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
1✔
1052
    }
1✔
1053

1054
    @Override
1055
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
1056
        throws Http2Exception {
1057
      NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
1✔
1058
    }
1✔
1059

1060
    @Override
1061
    public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception {
1062
      Http2Ping p = ping;
1✔
1063
      if (ackPayload == flowControlPing().payload()) {
1✔
1064
        flowControlPing().updateWindow();
1✔
1065
        logger.log(Level.FINE, "Window: {0}",
1✔
1066
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1067
      } else if (p != null) {
1✔
1068
        if (p.payload() == ackPayload) {
1✔
1069
          p.complete();
1✔
1070
          ping = null;
1✔
1071
        } else {
1072
          logger.log(Level.WARNING,
1✔
1073
              "Received unexpected ping ack. Expecting {0}, got {1}",
1074
              new Object[] {p.payload(), ackPayload});
1✔
1075
        }
1076
      } else {
1077
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1078
      }
1079
      if (keepAliveManager != null) {
1✔
1080
        keepAliveManager.onDataReceived();
1✔
1081
      }
1082
    }
1✔
1083

1084
    @Override
1085
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
1086
      if (keepAliveManager != null) {
1✔
1087
        keepAliveManager.onDataReceived();
×
1088
      }
1089
    }
1✔
1090
  }
1091

1092
  private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
1093
      implements AbstractNettyHandler.PingLimiter {
1094
    private int pingCount;
1095

1096
    public PingCountingFrameWriter(Http2FrameWriter delegate) {
1097
      super(delegate);
1✔
1098
    }
1✔
1099

1100
    @Override
1101
    public boolean isPingAllowed() {
1102
      // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
1103
      return pingCount < 2;
1✔
1104
    }
1105

1106
    @Override
1107
    public ChannelFuture writeHeaders(
1108
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1109
        int padding, boolean endStream, ChannelPromise promise) {
1110
      pingCount = 0;
×
1111
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
×
1112
    }
1113

1114
    @Override
1115
    public ChannelFuture writeHeaders(
1116
        ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1117
        int streamDependency, short weight, boolean exclusive,
1118
        int padding, boolean endStream, ChannelPromise promise) {
1119
      pingCount = 0;
1✔
1120
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
1✔
1121
          padding, endStream, promise);
1122
    }
1123

1124
    @Override
1125
    public ChannelFuture writeWindowUpdate(
1126
        ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
1127
      pingCount = 0;
1✔
1128
      return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
1✔
1129
    }
1130

1131
    @Override
1132
    public ChannelFuture writePing(
1133
        ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
1134
      if (!ack) {
1✔
1135
        pingCount++;
1✔
1136
      }
1137
      return super.writePing(ctx, ack, data, promise);
1✔
1138
    }
1139

1140
    @Override
1141
    public ChannelFuture writeData(
1142
        ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
1143
        ChannelPromise promise) {
1144
      if (data.isReadable()) {
1✔
1145
        pingCount = 0;
1✔
1146
      }
1147
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1148
    }
1149
  }
1150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc