• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20176

20 Feb 2026 03:31PM UTC coverage: 88.714% (+0.007%) from 88.707%
#20176

push

github

web-flow
netty: Preserve early server handshake failure cause in logs

Early server-side negotiation failures may terminate a transport before
NettyServerHandler is fully active in the pipeline. In those cases, the
original handshake failure can be missing from transport termination
logging because termination may rely on connectionError(), which can be
null on this early path.

This change adds a server-side NOOP write in
NettyServerTransport.start() (analogous to the existing client-side NOOP
write path). If that write fails, its cause is passed to
notifyTerminated(), preserving and logging the original transport
termination reason for debugging.

To support this, NettyServerHandler now accepts NOOP_MESSAGE writes by
writing an empty buffer, and tests are added to verify:
  - transport failure logging for plaintext-client to TLS-server failure
  - server NOOP write handling in NettyServerHandler

Fixes #8495

35459 of 39970 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.18
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http.HttpHeaderNames;
64
import io.netty.handler.codec.http2.DecoratingHttp2ConnectionEncoder;
65
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
66
import io.netty.handler.codec.http2.DefaultHttp2Connection;
67
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
68
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
69
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
70
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
71
import io.netty.handler.codec.http2.DefaultHttp2Headers;
72
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
73
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
74
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
75
import io.netty.handler.codec.http2.EmptyHttp2Headers;
76
import io.netty.handler.codec.http2.Http2Connection;
77
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
78
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
79
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
80
import io.netty.handler.codec.http2.Http2Error;
81
import io.netty.handler.codec.http2.Http2Exception;
82
import io.netty.handler.codec.http2.Http2Exception.StreamException;
83
import io.netty.handler.codec.http2.Http2FrameAdapter;
84
import io.netty.handler.codec.http2.Http2FrameLogger;
85
import io.netty.handler.codec.http2.Http2FrameReader;
86
import io.netty.handler.codec.http2.Http2FrameWriter;
87
import io.netty.handler.codec.http2.Http2Headers;
88
import io.netty.handler.codec.http2.Http2HeadersDecoder;
89
import io.netty.handler.codec.http2.Http2HeadersEncoder;
90
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
91
import io.netty.handler.codec.http2.Http2LifecycleManager;
92
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
93
import io.netty.handler.codec.http2.Http2Settings;
94
import io.netty.handler.codec.http2.Http2Stream;
95
import io.netty.handler.codec.http2.Http2StreamVisitor;
96
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
97
import io.netty.handler.logging.LogLevel;
98
import io.netty.util.AsciiString;
99
import io.netty.util.ReferenceCountUtil;
100
import io.perfmark.PerfMark;
101
import io.perfmark.Tag;
102
import io.perfmark.TaskCloseable;
103
import java.text.MessageFormat;
104
import java.util.List;
105
import java.util.concurrent.Future;
106
import java.util.concurrent.ScheduledFuture;
107
import java.util.concurrent.TimeUnit;
108
import java.util.logging.Level;
109
import java.util.logging.Logger;
110
import javax.annotation.CheckForNull;
111
import javax.annotation.Nullable;
112

113
/**
114
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
115
 * the context of the Netty Channel thread.
116
 */
117
class NettyServerHandler extends AbstractNettyHandler {
118
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
119
  private static final long KEEPALIVE_PING = 0xDEADL;
120
  @VisibleForTesting
121
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
122
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
123
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
124
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
125
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
126

127
  /**
128
   * A message that simply passes through the channel without any real processing. It is useful to
129
   * check if buffers have been drained and test the health of the channel in a single operation.
130
   */
131
  static final Object NOOP_MESSAGE = new Object();
1✔
132

133
  private final Http2Connection.PropertyKey streamKey;
134
  private final ServerTransportListener transportListener;
135
  private final int maxMessageSize;
136
  private final long keepAliveTimeInNanos;
137
  private final long keepAliveTimeoutInNanos;
138
  private final long maxConnectionAgeInNanos;
139
  private final long maxConnectionAgeGraceInNanos;
140
  private final RstStreamCounter rstStreamCounter;
141
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
142
  private final TransportTracer transportTracer;
143
  private final KeepAliveEnforcer keepAliveEnforcer;
144
  private final Attributes eagAttributes;
145
  /** Incomplete attributes produced by negotiator. */
146
  private Attributes negotiationAttributes;
147
  private InternalChannelz.Security securityInfo;
148
  /** Completed attributes produced by transportReady. */
149
  private Attributes attributes;
150
  private Throwable connectionError;
151
  private boolean teWarningLogged;
152
  private WriteQueue serverWriteQueue;
153
  private AsciiString lastKnownAuthority;
154
  @CheckForNull
155
  private KeepAliveManager keepAliveManager;
156
  @CheckForNull
157
  private MaxConnectionIdleManager maxConnectionIdleManager;
158
  @CheckForNull
159
  private ScheduledFuture<?> maxConnectionAgeMonitor;
160
  @CheckForNull
161
  private GracefulShutdown gracefulShutdown;
162

163
  static NettyServerHandler newHandler(
164
      ServerTransportListener transportListener,
165
      ChannelPromise channelUnused,
166
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
167
      TransportTracer transportTracer,
168
      int maxStreams,
169
      boolean autoFlowControl,
170
      int flowControlWindow,
171
      int maxHeaderListSize,
172
      int softLimitHeaderListSize,
173
      int maxMessageSize,
174
      long keepAliveTimeInNanos,
175
      long keepAliveTimeoutInNanos,
176
      long maxConnectionIdleInNanos,
177
      long maxConnectionAgeInNanos,
178
      long maxConnectionAgeGraceInNanos,
179
      boolean permitKeepAliveWithoutCalls,
180
      long permitKeepAliveTimeInNanos,
181
      int maxRstCount,
182
      long maxRstPeriodNanos,
183
      Attributes eagAttributes) {
184
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
185
        maxHeaderListSize);
186
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
187
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
188
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
189
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
190
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
191
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
192
    Http2FrameWriter frameWriter =
1✔
193
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(encoder), frameLogger);
194
    return newHandler(
1✔
195
        channelUnused,
196
        frameReader,
197
        frameWriter,
198
        transportListener,
199
        streamTracerFactories,
200
        transportTracer,
201
        maxStreams,
202
        autoFlowControl,
203
        flowControlWindow,
204
        maxHeaderListSize,
205
        softLimitHeaderListSize,
206
        maxMessageSize,
207
        keepAliveTimeInNanos,
208
        keepAliveTimeoutInNanos,
209
        maxConnectionIdleInNanos,
210
        maxConnectionAgeInNanos,
211
        maxConnectionAgeGraceInNanos,
212
        permitKeepAliveWithoutCalls,
213
        permitKeepAliveTimeInNanos,
214
        maxRstCount,
215
        maxRstPeriodNanos,
216
        eagAttributes,
217
        Ticker.systemTicker());
1✔
218
  }
219

220
  static NettyServerHandler newHandler(
221
      ChannelPromise channelUnused,
222
      Http2FrameReader frameReader,
223
      Http2FrameWriter frameWriter,
224
      ServerTransportListener transportListener,
225
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
226
      TransportTracer transportTracer,
227
      int maxStreams,
228
      boolean autoFlowControl,
229
      int flowControlWindow,
230
      int maxHeaderListSize,
231
      int softLimitHeaderListSize,
232
      int maxMessageSize,
233
      long keepAliveTimeInNanos,
234
      long keepAliveTimeoutInNanos,
235
      long maxConnectionIdleInNanos,
236
      long maxConnectionAgeInNanos,
237
      long maxConnectionAgeGraceInNanos,
238
      boolean permitKeepAliveWithoutCalls,
239
      long permitKeepAliveTimeInNanos,
240
      int maxRstCount,
241
      long maxRstPeriodNanos,
242
      Attributes eagAttributes,
243
      Ticker ticker) {
244
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
245
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
246
        flowControlWindow);
247
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
248
        maxHeaderListSize);
249
    Preconditions.checkArgument(
1✔
250
        softLimitHeaderListSize > 0, "softLimitHeaderListSize must be positive: %s",
251
        softLimitHeaderListSize);
252
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
253
        maxMessageSize);
254

255
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
256
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
257
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
258
    DefaultHttp2RemoteFlowController controller =
1✔
259
        new DefaultHttp2RemoteFlowController(connection, dist);
260
    connection.remote().flowController(controller);
1✔
261
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
262
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
263

264
    if (ticker == null) {
1✔
265
      ticker = Ticker.systemTicker();
×
266
    }
267

268
    RstStreamCounter rstStreamCounter
1✔
269
        = new RstStreamCounter(maxRstCount, maxRstPeriodNanos, ticker);
270
    // Create the local flow controller configured to auto-refill the connection window.
271
    connection.local().flowController(
1✔
272
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
273
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
274
    Http2ConnectionEncoder encoder =
1✔
275
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
276
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
277
    encoder = new Http2RstCounterEncoder(encoder, rstStreamCounter);
1✔
278
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
279
        frameReader);
280

281
    Http2Settings settings = new Http2Settings();
1✔
282
    settings.initialWindowSize(flowControlWindow);
1✔
283
    settings.maxConcurrentStreams(maxStreams);
1✔
284
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
285

286
    return new NettyServerHandler(
1✔
287
        channelUnused,
288
        connection,
289
        transportListener,
290
        streamTracerFactories,
291
        transportTracer,
292
        decoder, encoder, settings,
293
        maxMessageSize,
294
        maxHeaderListSize,
295
        softLimitHeaderListSize,
296
        keepAliveTimeInNanos,
297
        keepAliveTimeoutInNanos,
298
        maxConnectionIdleInNanos,
299
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
300
        keepAliveEnforcer,
301
        autoFlowControl,
302
        rstStreamCounter,
303
        eagAttributes, ticker);
304
  }
305

306
  private NettyServerHandler(
307
      ChannelPromise channelUnused,
308
      final Http2Connection connection,
309
      ServerTransportListener transportListener,
310
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
311
      TransportTracer transportTracer,
312
      Http2ConnectionDecoder decoder,
313
      Http2ConnectionEncoder encoder,
314
      Http2Settings settings,
315
      int maxMessageSize,
316
      int maxHeaderListSize,
317
      int softLimitHeaderListSize,
318
      long keepAliveTimeInNanos,
319
      long keepAliveTimeoutInNanos,
320
      long maxConnectionIdleInNanos,
321
      long maxConnectionAgeInNanos,
322
      long maxConnectionAgeGraceInNanos,
323
      final KeepAliveEnforcer keepAliveEnforcer,
324
      boolean autoFlowControl,
325
      RstStreamCounter rstStreamCounter,
326
      Attributes eagAttributes,
327
      Ticker ticker) {
328
    super(
1✔
329
        channelUnused,
330
        decoder,
331
        encoder,
332
        settings,
333
        new ServerChannelLogger(),
334
        autoFlowControl,
335
        null,
336
        ticker,
337
        maxHeaderListSize,
338
        softLimitHeaderListSize);
339

340
    final MaxConnectionIdleManager maxConnectionIdleManager;
341
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
342
      maxConnectionIdleManager = null;
1✔
343
    } else {
344
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
345
    }
346

347
    connection.addListener(new Http2ConnectionAdapter() {
1✔
348
      @Override
349
      public void onStreamActive(Http2Stream stream) {
350
        if (connection.numActiveStreams() == 1) {
1✔
351
          keepAliveEnforcer.onTransportActive();
1✔
352
          if (maxConnectionIdleManager != null) {
1✔
353
            maxConnectionIdleManager.onTransportActive();
1✔
354
          }
355
        }
356
      }
1✔
357

358
      @Override
359
      public void onStreamClosed(Http2Stream stream) {
360
        if (connection.numActiveStreams() == 0) {
1✔
361
          keepAliveEnforcer.onTransportIdle();
1✔
362
          if (maxConnectionIdleManager != null) {
1✔
363
            maxConnectionIdleManager.onTransportIdle();
1✔
364
          }
365
        }
366
      }
1✔
367
    });
368

369
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
370
    this.maxMessageSize = maxMessageSize;
1✔
371
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
372
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
373
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
374
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
375
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
376
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
377
    this.rstStreamCounter = rstStreamCounter;
1✔
378
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
379

380
    streamKey = encoder.connection().newKey();
1✔
381
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
382
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
383
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
384
    // Set the frame listener on the decoder.
385
    decoder().frameListener(new FrameListener());
1✔
386
  }
1✔
387

388
  @Nullable
389
  Throwable connectionError() {
390
    return connectionError;
1✔
391
  }
392

393
  @Override
394
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
395
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
396

397
    // init max connection age monitor
398
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
399
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
400
          new LogExceptionRunnable(new Runnable() {
1✔
401
            @Override
402
            public void run() {
403
              if (gracefulShutdown == null) {
1✔
404
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
405
                gracefulShutdown.start(ctx);
1✔
406
                ctx.flush();
1✔
407
              }
408
            }
1✔
409
          }),
410
          maxConnectionAgeInNanos,
411
          TimeUnit.NANOSECONDS);
412
    }
413

414
    if (maxConnectionIdleManager != null) {
1✔
415
      maxConnectionIdleManager.start(new Runnable() {
1✔
416
        @Override
417
        public void run() {
418
          if (gracefulShutdown == null) {
1✔
419
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
420
            gracefulShutdown.start(ctx);
1✔
421
            ctx.flush();
1✔
422
          }
423
        }
1✔
424
      }, ctx.executor());
1✔
425
    }
426

427
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
428
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
429
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
430
      keepAliveManager.onTransportStarted();
1✔
431
    }
432

433
    assert encoder().connection().equals(decoder().connection());
1✔
434
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
435

436
    super.handlerAdded(ctx);
1✔
437
  }
1✔
438

439
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
440
      throws Http2Exception {
441
    try {
442
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
443
      // by Netty. RFC 7540 section 8.1.2.2
444
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
445
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
446
        return;
×
447
      }
448

449
      if (headers.authority() == null) {
1✔
450
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
451
        if (hosts.size() > 1) {
1✔
452
          // RFC 7230 section 5.4
453
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
454
              "Multiple host headers");
455
          return;
1✔
456
        }
457
        if (!hosts.isEmpty()) {
1✔
458
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
459
        }
460
      }
461
      headers.remove(HOST);
1✔
462

463
      // Remove the leading slash of the path and get the fully qualified method name
464
      CharSequence path = headers.path();
1✔
465

466
      if (path == null) {
1✔
467
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
468
            "Expected path but is missing");
469
        return;
1✔
470
      }
471

472
      if (path.charAt(0) != '/') {
1✔
473
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
474
            String.format("Expected path to start with /: %s", path));
1✔
475
        return;
1✔
476
      }
477

478
      String method = path.subSequence(1, path.length()).toString();
1✔
479

480
      // Verify that the Content-Type is correct in the request.
481
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
482
      if (contentType == null) {
1✔
483
        respondWithHttpError(
1✔
484
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
485
        return;
1✔
486
      }
487
      String contentTypeString = contentType.toString();
1✔
488
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
489
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
490
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
491
        return;
1✔
492
      }
493

494
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
495
        Http2Headers extraHeaders = new DefaultHttp2Headers();
1✔
496
        extraHeaders.add(HttpHeaderNames.ALLOW, HTTP_METHOD);
1✔
497
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
498
            String.format("Method '%s' is not supported", headers.method()), extraHeaders);
1✔
499
        return;
1✔
500
      }
501

502
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
503
      if (Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
504
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize)) {
505
        respondWithHttpError(ctx, streamId, 431, Status.Code.RESOURCE_EXHAUSTED, String.format(
×
506
                "Client Headers of size %d exceeded Metadata size soft limit: %d",
507
                h2HeadersSize,
×
508
                softLimitHeaderListSize));
×
509
        return;
×
510
      }
511

512
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
513
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
514
                + "some intermediate proxy may not support trailers",
515
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
516
        teWarningLogged = true;
1✔
517
      }
518

519
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
520
      // method.
521
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
522

523
      Metadata metadata = Utils.convertHeaders(headers);
1✔
524
      StatsTraceContext statsTraceCtx =
1✔
525
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
526

527
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
528
          this,
529
          ctx.channel().eventLoop(),
1✔
530
          http2Stream,
531
          maxMessageSize,
532
          statsTraceCtx,
533
          transportTracer,
534
          method);
535

536
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
537
        PerfMark.attachTag(state.tag());
1✔
538
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
539
        NettyServerStream stream = new NettyServerStream(
1✔
540
            ctx.channel(),
1✔
541
            state,
542
            attributes,
543
            authority,
544
            statsTraceCtx);
545
        transportListener.streamCreated(stream, method, metadata);
1✔
546
        state.onStreamAllocated();
1✔
547
        http2Stream.setProperty(streamKey, state);
1✔
548
      }
549
    } catch (Exception e) {
×
550
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
551
      // Throw an exception that will get handled by onStreamError.
552
      throw newStreamException(streamId, e);
×
553
    }
1✔
554
  }
1✔
555

556
  private String getOrUpdateAuthority(AsciiString authority) {
557
    if (authority == null) {
1✔
558
      return null;
1✔
559
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
560
      lastKnownAuthority = authority;
1✔
561
    }
562

563
    // AsciiString.toString() is internally cached, so subsequent calls will not
564
    // result in recomputing the String representation of lastKnownAuthority.
565
    return lastKnownAuthority.toString();
1✔
566
  }
567

568
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
569
      throws Http2Exception {
570
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
571
    try {
572
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
573
      if (stream == null) {
1✔
574
        return;
1✔
575
      }
576
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
577
        PerfMark.attachTag(stream.tag());
1✔
578
        stream.inboundDataReceived(data, endOfStream);
1✔
579
      }
580
    } catch (Throwable e) {
×
581
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
582
      // Throw an exception that will get handled by onStreamError.
583
      throw newStreamException(streamId, e);
×
584
    }
1✔
585
  }
1✔
586

587
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
588
    Http2Exception tooManyRstStream = rstStreamCounter.countRstStream();
1✔
589
    if (tooManyRstStream != null) {
1✔
590
      throw tooManyRstStream;
1✔
591
    }
592

593
    try {
594
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
595
      if (stream != null) {
1✔
596
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
597
          PerfMark.attachTag(stream.tag());
1✔
598
          stream.transportReportStatus(
1✔
599
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
600
        }
601
      }
602
    } catch (Throwable e) {
×
603
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
×
604
      // Throw an exception that will get handled by onStreamError.
605
      throw newStreamException(streamId, e);
×
606
    }
1✔
607
  }
1✔
608

609
  @Override
610
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
611
      Http2Exception http2Ex) {
612
    logger.log(Level.FINE, "Connection Error", cause);
1✔
613
    connectionError = cause;
1✔
614
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
615
  }
1✔
616

617
  @Override
618
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
619
      StreamException http2Ex) {
620
    NettyServerStream.TransportState serverStream = serverStream(
1✔
621
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
622
    Level level = Level.WARNING;
1✔
623
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
624
      level = Level.FINE;
1✔
625
    }
626
    logger.log(level, "Stream Error", cause);
1✔
627
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
628
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
629
      PerfMark.attachTag(tag);
1✔
630
      if (serverStream != null) {
1✔
631
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
632
      }
633
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
634
      // Delegate to the base class to send a RST_STREAM.
635
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
636
    }
637
  }
1✔
638

639
  @Override
640
  public void handleProtocolNegotiationCompleted(
641
      Attributes attrs, InternalChannelz.Security securityInfo) {
642
    negotiationAttributes = attrs;
1✔
643
    this.securityInfo = securityInfo;
1✔
644
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
645
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
646
  }
1✔
647

648
  @Override
649
  public Attributes getEagAttributes() {
650
    return eagAttributes;
1✔
651
  }
652

653
  InternalChannelz.Security getSecurityInfo() {
654
    return securityInfo;
1✔
655
  }
656

657
  @VisibleForTesting
658
  KeepAliveManager getKeepAliveManagerForTest() {
659
    return keepAliveManager;
1✔
660
  }
661

662
  @VisibleForTesting
663
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
664
    this.keepAliveManager = keepAliveManager;
1✔
665
  }
1✔
666

667
  /**
668
   * Handler for the Channel shutting down.
669
   */
670
  @Override
671
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
672
    try {
673
      if (keepAliveManager != null) {
1✔
674
        keepAliveManager.onTransportTermination();
1✔
675
      }
676
      if (maxConnectionIdleManager != null) {
1✔
677
        maxConnectionIdleManager.onTransportTermination();
1✔
678
      }
679
      if (maxConnectionAgeMonitor != null) {
1✔
680
        maxConnectionAgeMonitor.cancel(false);
1✔
681
      }
682
      final Status status =
1✔
683
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
684
      // Any streams that are still active must be closed
685
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
686
        @Override
687
        public boolean visit(Http2Stream stream) throws Http2Exception {
688
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
689
          if (serverStream != null) {
1✔
690
            serverStream.transportReportStatus(status);
1✔
691
          }
692
          return true;
1✔
693
        }
694
      });
695
    } finally {
696
      super.channelInactive(ctx);
1✔
697
    }
698
  }
1✔
699

700
  WriteQueue getWriteQueue() {
701
    return serverWriteQueue;
1✔
702
  }
703

704
  /** Handler for commands sent from the stream. */
705
  @Override
706
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
707
      throws Exception {
708
    if (msg instanceof SendGrpcFrameCommand) {
1✔
709
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
710
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
711
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
712
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
713
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
714
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
715
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
716
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
717
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
718
    } else if (msg == NOOP_MESSAGE) {
1✔
719
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
1✔
720
    } else {
721
      AssertionError e =
×
722
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
723
      ReferenceCountUtil.release(msg);
×
724
      promise.setFailure(e);
×
725
      throw e;
×
726
    }
727
  }
1✔
728

729
  @Override
730
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
731
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
732
    ctx.flush();
1✔
733
  }
1✔
734

735
  /**
736
   * Returns the given processed bytes back to inbound flow control.
737
   */
738
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
739
    try {
740
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
741
    } catch (Http2Exception e) {
×
742
      throw new RuntimeException(e);
×
743
    }
1✔
744
  }
1✔
745

746
  private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
747
    promise.addListener(
1✔
748
        new ChannelFutureListener() {
1✔
749
          @Override
750
          public void operationComplete(ChannelFuture future) {
751
            serverStream(stream).complete();
1✔
752
          }
1✔
753
        });
754
  }
1✔
755

756
  private static void streamGone(int streamId, ChannelPromise promise) {
757
    promise.setFailure(
1✔
758
        new IllegalStateException(
759
            "attempting to write to stream " + streamId + " that no longer exists") {
1✔
760
          @Override
761
          public synchronized Throwable fillInStackTrace() {
762
            return this;
1✔
763
          }
764
        });
765
  }
1✔
766

767
  /** Sends the given gRPC frame to the client. */
768
  private void sendGrpcFrame(
769
      ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
770
      throws Http2Exception {
771
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
772
      PerfMark.attachTag(cmd.stream().tag());
1✔
773
      PerfMark.linkIn(cmd.getLink());
1✔
774
      int streamId = cmd.stream().id();
1✔
775
      Http2Stream stream = connection().stream(streamId);
1✔
776
      if (stream == null) {
1✔
777
        cmd.release();
1✔
778
        streamGone(streamId, promise);
1✔
779
        return;
1✔
780
      }
781
      if (cmd.endStream()) {
1✔
782
        closeStreamWhenDone(promise, stream);
×
783
      }
784
      // Call the base class to write the HTTP/2 DATA frame.
785
      encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
1✔
786
    }
787
  }
1✔
788

789
  /**
790
   * Sends the response headers to the client.
791
   */
792
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
793
      ChannelPromise promise) throws Http2Exception {
794
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
795
      PerfMark.attachTag(cmd.stream().tag());
1✔
796
      PerfMark.linkIn(cmd.getLink());
1✔
797
      int streamId = cmd.stream().id();
1✔
798
      Http2Stream stream = connection().stream(streamId);
1✔
799
      if (stream == null) {
1✔
800
        streamGone(streamId, promise);
1✔
801
        return;
1✔
802
      }
803
      if (cmd.endOfStream()) {
1✔
804
        closeStreamWhenDone(promise, stream);
1✔
805
      }
806
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
807
    }
808
  }
1✔
809

810
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
811
      ChannelPromise promise) {
812
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
813
      PerfMark.attachTag(cmd.stream().tag());
1✔
814
      PerfMark.linkIn(cmd.getLink());
1✔
815
      // Notify the listener if we haven't already.
816
      cmd.stream().transportReportStatus(cmd.reason());
1✔
817

818
      // Now we need to decide how we're going to notify the peer that this stream is closed.
819
      // If possible, it's nice to inform the peer _why_ this stream was cancelled by sending
820
      // a structured headers frame.
821
      if (shouldCloseStreamWithHeaders(cmd, connection())) {
1✔
822
        Metadata md = new Metadata();
1✔
823
        md.put(InternalStatus.CODE_KEY, cmd.reason());
1✔
824
        if (cmd.reason().getDescription() != null) {
1✔
825
          md.put(InternalStatus.MESSAGE_KEY, cmd.reason().getDescription());
1✔
826
        }
827
        Http2Headers headers = Utils.convertServerHeaders(md);
1✔
828
        encoder().writeHeaders(
1✔
829
            ctx, cmd.stream().id(), headers, /* padding = */ 0, /* endStream = */ true, promise);
1✔
830
      } else {
1✔
831
        // Terminate the stream.
832
        encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
833
      }
834
    }
835
  }
1✔
836

837
  // Determine whether a CancelServerStreamCommand should try to close the stream with a
838
  // HEADERS or a RST_STREAM frame. The caller has some influence over this (they can
839
  // configure cmd.wantsHeaders()). The state of the stream also has an influence: we
840
  // only try to send HEADERS if the stream exists and hasn't already sent any headers.
841
  private static boolean shouldCloseStreamWithHeaders(
842
          CancelServerStreamCommand cmd, Http2Connection conn) {
843
    if (!cmd.wantsHeaders()) {
1✔
844
      return false;
1✔
845
    }
846
    Http2Stream stream = conn.stream(cmd.stream().id());
1✔
847
    return stream != null && !stream.isHeadersSent();
1✔
848
  }
849

850
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
851
      ChannelPromise promise) throws Exception {
852
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
853
    // requested here. But that's an edge case and seems bug-prone.
854
    if (gracefulShutdown == null) {
1✔
855
      Long graceTimeInNanos = null;
1✔
856
      if (msg.getGraceTimeUnit() != null) {
1✔
857
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
858
      }
859
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
860
      gracefulShutdown.start(ctx);
1✔
861
    }
862
    promise.setSuccess();
1✔
863
  }
1✔
864

865
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
866
      ChannelPromise promise) throws Exception {
867
    super.close(ctx, promise);
1✔
868
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
869
      @Override
870
      public boolean visit(Http2Stream stream) throws Http2Exception {
871
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
872
        if (serverStream != null) {
1✔
873
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
874
            PerfMark.attachTag(serverStream.tag());
1✔
875
            PerfMark.linkIn(msg.getLink());
1✔
876
            serverStream.transportReportStatus(msg.getStatus());
1✔
877
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
878
          }
879
        }
880
        stream.close();
1✔
881
        return true;
1✔
882
      }
883
    });
884
  }
1✔
885

886
  private void respondWithHttpError(
887
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
888
    respondWithHttpError(ctx, streamId, code, statusCode, msg, EmptyHttp2Headers.INSTANCE);
1✔
889
  }
1✔
890

891
  private void respondWithHttpError(
892
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg,
893
      Http2Headers extraHeaders) {
894
    Metadata metadata = new Metadata();
1✔
895
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
896
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
897
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
898

899
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
900
        .status("" + code)
1✔
901
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
902
    for (int i = 0; i < serialized.length; i += 2) {
1✔
903
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
904
    }
905
    headers.add(extraHeaders);
1✔
906
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
907
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
908
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
909
  }
1✔
910

911
  private Http2Stream requireHttp2Stream(int streamId) {
912
    Http2Stream stream = connection().stream(streamId);
1✔
913
    if (stream == null) {
1✔
914
      // This should never happen.
915
      throw new AssertionError("Stream does not exist: " + streamId);
×
916
    }
917
    return stream;
1✔
918
  }
919

920
  /**
921
   * Returns the server stream associated to the given HTTP/2 stream object.
922
   */
923
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
924
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
925
  }
926

927
  private Http2Exception newStreamException(int streamId, Throwable cause) {
928
    return Http2Exception.streamError(
×
929
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
×
930
  }
931

932
  private class FrameListener extends Http2FrameAdapter {
1✔
933
    private boolean firstSettings = true;
1✔
934

935
    @Override
936
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
937
      if (firstSettings) {
1✔
938
        firstSettings = false;
1✔
939
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
940
        // handshakeTimeout
941
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
942
      }
943
    }
1✔
944

945
    @Override
946
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
947
        boolean endOfStream) throws Http2Exception {
948
      if (keepAliveManager != null) {
1✔
949
        keepAliveManager.onDataReceived();
1✔
950
      }
951
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
952
      return padding;
1✔
953
    }
954

955
    @Override
956
    public void onHeadersRead(ChannelHandlerContext ctx,
957
        int streamId,
958
        Http2Headers headers,
959
        int streamDependency,
960
        short weight,
961
        boolean exclusive,
962
        int padding,
963
        boolean endStream) throws Http2Exception {
964
      if (keepAliveManager != null) {
1✔
965
        keepAliveManager.onDataReceived();
1✔
966
      }
967
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
968
      if (endStream) {
1✔
969
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
970
      }
971
    }
1✔
972

973
    @Override
974
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
975
        throws Http2Exception {
976
      if (keepAliveManager != null) {
1✔
977
        keepAliveManager.onDataReceived();
1✔
978
      }
979
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
980
    }
1✔
981

982
    @Override
983
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
984
      if (keepAliveManager != null) {
1✔
985
        keepAliveManager.onDataReceived();
1✔
986
      }
987
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
988
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
989
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
990
            debugData, ctx.newPromise());
1✔
991
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
992
        try {
993
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
994
        } catch (Exception ex) {
×
995
          onError(ctx, /* outbound= */ true, ex);
×
996
        }
1✔
997
      }
998
    }
1✔
999

1000
    @Override
1001
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
1002
      if (keepAliveManager != null) {
1✔
1003
        keepAliveManager.onDataReceived();
1✔
1004
      }
1005
      if (data == flowControlPing().payload()) {
1✔
1006
        flowControlPing().updateWindow();
1✔
1007
        logger.log(Level.FINE, "Window: {0}",
1✔
1008
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1009
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
1010
        if (gracefulShutdown == null) {
1✔
1011
          // this should never happen
1012
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
1013
        } else {
1014
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
1015
        }
1016
      } else if (data != KEEPALIVE_PING) {
1✔
1017
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1018
      }
1019
    }
1✔
1020
  }
1021

1022
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1023
    final ChannelHandlerContext ctx;
1024

1025
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
1026
      this.ctx = ctx;
1✔
1027
    }
1✔
1028

1029
    @Override
1030
    public void ping() {
1031
      ChannelFuture pingFuture = encoder().writePing(
1✔
1032
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
1033
      ctx.flush();
1✔
1034
      pingFuture.addListener(new ChannelFutureListener() {
1✔
1035
        @Override
1036
        public void operationComplete(ChannelFuture future) throws Exception {
1037
          if (future.isSuccess()) {
1✔
1038
            transportTracer.reportKeepAliveSent();
1✔
1039
          }
1040
        }
1✔
1041
      });
1042
    }
1✔
1043

1044
    @Override
1045
    public void onPingTimeout() {
1046
      try {
1047
        forcefulClose(
1✔
1048
            ctx,
1049
            new ForcefulCloseCommand(Status.UNAVAILABLE
1050
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
1051
            ctx.newPromise());
1✔
1052
      } catch (Exception ex) {
×
1053
        try {
1054
          exceptionCaught(ctx, ex);
×
1055
        } catch (Exception ex2) {
×
1056
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
1057
          logger.log(Level.WARNING, "Original failure", ex);
×
1058
        }
×
1059
      }
1✔
1060
    }
1✔
1061
  }
1062

1063
  private final class GracefulShutdown {
1064
    String goAwayMessage;
1065

1066
    /**
1067
     * The grace time between starting graceful shutdown and closing the netty channel,
1068
     * {@code null} is unspecified.
1069
     */
1070
    @CheckForNull
1071
    Long graceTimeInNanos;
1072

1073
    /**
1074
     * True if ping is Acked or ping is timeout.
1075
     */
1076
    boolean pingAckedOrTimeout;
1077

1078
    Future<?> pingFuture;
1079

1080
    GracefulShutdown(String goAwayMessage,
1081
        @Nullable Long graceTimeInNanos) {
1✔
1082
      this.goAwayMessage = goAwayMessage;
1✔
1083
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1084
    }
1✔
1085

1086
    /**
1087
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1088
     */
1089
    void start(final ChannelHandlerContext ctx) {
1090
      goAway(
1✔
1091
          ctx,
1092
          Integer.MAX_VALUE,
1093
          Http2Error.NO_ERROR.code(),
1✔
1094
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1095
          ctx.newPromise());
1✔
1096

1097
      pingFuture = ctx.executor().schedule(
1✔
1098
          new Runnable() {
1✔
1099
            @Override
1100
            public void run() {
1101
              secondGoAwayAndClose(ctx);
1✔
1102
            }
1✔
1103
          },
1104
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1105
          TimeUnit.NANOSECONDS);
1106

1107
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1108
    }
1✔
1109

1110
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1111
      if (pingAckedOrTimeout) {
1✔
1112
        return;
×
1113
      }
1114
      pingAckedOrTimeout = true;
1✔
1115

1116
      checkNotNull(pingFuture, "pingFuture");
1✔
1117
      pingFuture.cancel(false);
1✔
1118

1119
      // send the second GOAWAY with last stream id
1120
      goAway(
1✔
1121
          ctx,
1122
          connection().remote().lastStreamCreated(),
1✔
1123
          Http2Error.NO_ERROR.code(),
1✔
1124
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1125
          ctx.newPromise());
1✔
1126

1127
      // gracefully shutdown with specified grace time
1128
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1129
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1130
      try {
1131
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1132
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1133
      } catch (Exception e) {
×
1134
        onError(ctx, /* outbound= */ true, e);
×
1135
      } finally {
1136
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1137
      }
1138
    }
1✔
1139

1140
    private long graceTimeOverrideMillis(long originalMillis) {
1141
      if (graceTimeInNanos == null) {
1✔
1142
        return originalMillis;
1✔
1143
      }
1144
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1145
        // netty treats -1 as "no timeout"
1146
        return -1L;
1✔
1147
      }
1148
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1149
    }
1150
  }
1151

1152
  // Use a frame writer so that we know when frames are through flow control and actually being
1153
  // written.
1154
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1155
    private final KeepAliveEnforcer keepAliveEnforcer;
1156

1157
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1158
        KeepAliveEnforcer keepAliveEnforcer) {
1159
      super(delegate);
1✔
1160
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1161
    }
1✔
1162

1163
    @Override
1164
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1165
        int padding, boolean endStream, ChannelPromise promise) {
1166
      keepAliveEnforcer.resetCounters();
1✔
1167
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1168
    }
1169

1170
    @Override
1171
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1172
        int padding, boolean endStream, ChannelPromise promise) {
1173
      keepAliveEnforcer.resetCounters();
1✔
1174
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1175
    }
1176

1177
    @Override
1178
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1179
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1180
        ChannelPromise promise) {
1181
      keepAliveEnforcer.resetCounters();
×
1182
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1183
          padding, endStream, promise);
1184
    }
1185
  }
1186

1187
  private static final class Http2RstCounterEncoder extends DecoratingHttp2ConnectionEncoder {
1188
    private final RstStreamCounter rstStreamCounter;
1189
    private Http2LifecycleManager lifecycleManager;
1190

1191
    Http2RstCounterEncoder(Http2ConnectionEncoder encoder, RstStreamCounter rstStreamCounter) {
1192
      super(encoder);
1✔
1193
      this.rstStreamCounter = rstStreamCounter;
1✔
1194
    }
1✔
1195

1196
    @Override
1197
    public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
1198
      this.lifecycleManager = lifecycleManager;
1✔
1199
      super.lifecycleManager(lifecycleManager);
1✔
1200
    }
1✔
1201

1202
    @Override
1203
    public ChannelFuture writeRstStream(
1204
        ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) {
1205
      ChannelFuture future = super.writeRstStream(ctx, streamId, errorCode, promise);
1✔
1206
      // We want to count "induced" RST_STREAM, where the server sent a reset because of a malformed
1207
      // frame.
1208
      boolean normalRst
1✔
1209
          = errorCode == Http2Error.NO_ERROR.code() || errorCode == Http2Error.CANCEL.code();
1✔
1210
      if (!normalRst) {
1✔
1211
        Http2Exception tooManyRstStream = rstStreamCounter.countRstStream();
1✔
1212
        if (tooManyRstStream != null) {
1✔
1213
          lifecycleManager.onError(ctx, true, tooManyRstStream);
1✔
1214
          ctx.close();
1✔
1215
        }
1216
      }
1217
      return future;
1✔
1218
    }
1219
  }
1220

1221
  private static final class RstStreamCounter {
1222
    private final int maxRstCount;
1223
    private final long maxRstPeriodNanos;
1224
    private final Ticker ticker;
1225
    private int rstCount;
1226
    private long lastRstNanoTime;
1227

1228
    RstStreamCounter(int maxRstCount, long maxRstPeriodNanos, Ticker ticker) {
1✔
1229
      checkArgument(maxRstCount >= 0, "maxRstCount must be non-negative: %s", maxRstCount);
1✔
1230
      this.maxRstCount = maxRstCount;
1✔
1231
      this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
1232
      this.ticker = checkNotNull(ticker, "ticker");
1✔
1233
      this.lastRstNanoTime = ticker.read();
1✔
1234
    }
1✔
1235

1236
    /** Returns non-{@code null} when the connection should be killed by the caller. */
1237
    private Http2Exception countRstStream() {
1238
      if (maxRstCount == 0) {
1✔
1239
        return null;
1✔
1240
      }
1241
      long now = ticker.read();
1✔
1242
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
1243
        lastRstNanoTime = now;
1✔
1244
        rstCount = 1;
1✔
1245
      } else {
1246
        rstCount++;
1✔
1247
        if (rstCount > maxRstCount) {
1✔
1248
          return new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
1249
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
1250
            @Override
1251
            public Throwable fillInStackTrace() {
1252
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
1253
              return this;
1✔
1254
            }
1255
          };
1256
        }
1257
      }
1258
      return null;
1✔
1259
    }
1260
  }
1261

1262
  private static class ServerChannelLogger extends ChannelLogger {
1263
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1264

1265
    @Override
1266
    public void log(ChannelLogLevel level, String message) {
1267
      log.log(toJavaLogLevel(level), message);
1✔
1268
    }
1✔
1269

1270
    @Override
1271
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1272
      log(level, MessageFormat.format(messageFormat, args));
1✔
1273
    }
1✔
1274
  }
1275

1276
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1277
    switch (level) {
1✔
1278
      case ERROR:
1279
        return Level.FINE;
×
1280
      case WARNING:
1281
        return Level.FINER;
×
1282
      default:
1283
        return Level.FINEST;
1✔
1284
    }
1285
  }
1286
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc