• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20230

31 Mar 2026 09:55AM UTC coverage: 88.734% (+0.01%) from 88.72%
#20230

push

github

web-flow
openTelemetry: add tcp metrics (#12652)

Implements [A80](https://github.com/grpc/proposal/pull/519)

35697 of 40229 relevant lines covered (88.73%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.21
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.MetricRecorder;
46
import io.grpc.ServerStreamTracer;
47
import io.grpc.Status;
48
import io.grpc.internal.GrpcUtil;
49
import io.grpc.internal.KeepAliveEnforcer;
50
import io.grpc.internal.KeepAliveManager;
51
import io.grpc.internal.LogExceptionRunnable;
52
import io.grpc.internal.MaxConnectionIdleManager;
53
import io.grpc.internal.ServerTransportListener;
54
import io.grpc.internal.StatsTraceContext;
55
import io.grpc.internal.TransportTracer;
56
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
57
import io.netty.buffer.ByteBuf;
58
import io.netty.buffer.ByteBufUtil;
59
import io.netty.buffer.Unpooled;
60
import io.netty.channel.ChannelFuture;
61
import io.netty.channel.ChannelFutureListener;
62
import io.netty.channel.ChannelHandlerContext;
63
import io.netty.channel.ChannelPromise;
64
import io.netty.handler.codec.http.HttpHeaderNames;
65
import io.netty.handler.codec.http2.DecoratingHttp2ConnectionEncoder;
66
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
67
import io.netty.handler.codec.http2.DefaultHttp2Connection;
68
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
69
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
70
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
71
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
72
import io.netty.handler.codec.http2.DefaultHttp2Headers;
73
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
74
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
75
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
76
import io.netty.handler.codec.http2.EmptyHttp2Headers;
77
import io.netty.handler.codec.http2.Http2Connection;
78
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
79
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
80
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
81
import io.netty.handler.codec.http2.Http2Error;
82
import io.netty.handler.codec.http2.Http2Exception;
83
import io.netty.handler.codec.http2.Http2Exception.StreamException;
84
import io.netty.handler.codec.http2.Http2FrameAdapter;
85
import io.netty.handler.codec.http2.Http2FrameLogger;
86
import io.netty.handler.codec.http2.Http2FrameReader;
87
import io.netty.handler.codec.http2.Http2FrameWriter;
88
import io.netty.handler.codec.http2.Http2Headers;
89
import io.netty.handler.codec.http2.Http2HeadersDecoder;
90
import io.netty.handler.codec.http2.Http2HeadersEncoder;
91
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
92
import io.netty.handler.codec.http2.Http2LifecycleManager;
93
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
94
import io.netty.handler.codec.http2.Http2Settings;
95
import io.netty.handler.codec.http2.Http2Stream;
96
import io.netty.handler.codec.http2.Http2StreamVisitor;
97
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
98
import io.netty.handler.logging.LogLevel;
99
import io.netty.util.AsciiString;
100
import io.netty.util.ReferenceCountUtil;
101
import io.perfmark.PerfMark;
102
import io.perfmark.Tag;
103
import io.perfmark.TaskCloseable;
104
import java.text.MessageFormat;
105
import java.util.List;
106
import java.util.concurrent.Future;
107
import java.util.concurrent.ScheduledFuture;
108
import java.util.concurrent.TimeUnit;
109
import java.util.logging.Level;
110
import java.util.logging.Logger;
111
import javax.annotation.CheckForNull;
112
import javax.annotation.Nullable;
113

114
/**
115
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
116
 * the context of the Netty Channel thread.
117
 */
118
class NettyServerHandler extends AbstractNettyHandler {
119
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
120
  private static final long KEEPALIVE_PING = 0xDEADL;
121
  @VisibleForTesting
122
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
123
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
124
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
125
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
126
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
127

128
  private final Http2Connection.PropertyKey streamKey;
129
  private final ServerTransportListener transportListener;
130
  private final int maxMessageSize;
131
  private final TcpMetrics tcpMetrics;
132
  private final long keepAliveTimeInNanos;
133
  private final long keepAliveTimeoutInNanos;
134
  private final long maxConnectionAgeInNanos;
135
  private final long maxConnectionAgeGraceInNanos;
136
  private final RstStreamCounter rstStreamCounter;
137
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
138
  private final TransportTracer transportTracer;
139
  private final KeepAliveEnforcer keepAliveEnforcer;
140
  private final Attributes eagAttributes;
141
  /** Incomplete attributes produced by negotiator. */
142
  private Attributes negotiationAttributes;
143
  private InternalChannelz.Security securityInfo;
144
  /** Completed attributes produced by transportReady. */
145
  private Attributes attributes;
146
  private Throwable connectionError;
147
  private boolean teWarningLogged;
148
  private WriteQueue serverWriteQueue;
149
  private AsciiString lastKnownAuthority;
150
  @CheckForNull
151
  private KeepAliveManager keepAliveManager;
152
  @CheckForNull
153
  private MaxConnectionIdleManager maxConnectionIdleManager;
154
  @CheckForNull
155
  private ScheduledFuture<?> maxConnectionAgeMonitor;
156
  @CheckForNull
157
  private GracefulShutdown gracefulShutdown;
158

159
  static NettyServerHandler newHandler(
160
      ServerTransportListener transportListener,
161
      ChannelPromise channelUnused,
162
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
163
      TransportTracer transportTracer,
164
      int maxStreams,
165
      boolean autoFlowControl,
166
      int flowControlWindow,
167
      int maxHeaderListSize,
168
      int softLimitHeaderListSize,
169
      int maxMessageSize,
170
      long keepAliveTimeInNanos,
171
      long keepAliveTimeoutInNanos,
172
      long maxConnectionIdleInNanos,
173
      long maxConnectionAgeInNanos,
174
      long maxConnectionAgeGraceInNanos,
175
      boolean permitKeepAliveWithoutCalls,
176
      long permitKeepAliveTimeInNanos,
177
      int maxRstCount,
178
      long maxRstPeriodNanos,
179
      Attributes eagAttributes,
180
      MetricRecorder metricRecorder) {
181
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
182
        maxHeaderListSize);
183
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
184
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
185
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
186
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
187
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
188
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
189
    Http2FrameWriter frameWriter =
1✔
190
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(encoder), frameLogger);
191
    return newHandler(
1✔
192
        channelUnused,
193
        frameReader,
194
        frameWriter,
195
        transportListener,
196
        streamTracerFactories,
197
        transportTracer,
198
        maxStreams,
199
        autoFlowControl,
200
        flowControlWindow,
201
        maxHeaderListSize,
202
        softLimitHeaderListSize,
203
        maxMessageSize,
204
        keepAliveTimeInNanos,
205
        keepAliveTimeoutInNanos,
206
        maxConnectionIdleInNanos,
207
        maxConnectionAgeInNanos,
208
        maxConnectionAgeGraceInNanos,
209
        permitKeepAliveWithoutCalls,
210
        permitKeepAliveTimeInNanos,
211
        maxRstCount,
212
        maxRstPeriodNanos,
213
        eagAttributes,
214
        Ticker.systemTicker(),
1✔
215
        metricRecorder);
216
  }
217

218
  static NettyServerHandler newHandler(
219
      ChannelPromise channelUnused,
220
      Http2FrameReader frameReader,
221
      Http2FrameWriter frameWriter,
222
      ServerTransportListener transportListener,
223
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
224
      TransportTracer transportTracer,
225
      int maxStreams,
226
      boolean autoFlowControl,
227
      int flowControlWindow,
228
      int maxHeaderListSize,
229
      int softLimitHeaderListSize,
230
      int maxMessageSize,
231
      long keepAliveTimeInNanos,
232
      long keepAliveTimeoutInNanos,
233
      long maxConnectionIdleInNanos,
234
      long maxConnectionAgeInNanos,
235
      long maxConnectionAgeGraceInNanos,
236
      boolean permitKeepAliveWithoutCalls,
237
      long permitKeepAliveTimeInNanos,
238
      int maxRstCount,
239
      long maxRstPeriodNanos,
240
      Attributes eagAttributes,
241
      Ticker ticker,
242
      MetricRecorder metricRecorder) {
243
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
244
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
245
        flowControlWindow);
246
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
247
        maxHeaderListSize);
248
    Preconditions.checkArgument(
1✔
249
        softLimitHeaderListSize > 0, "softLimitHeaderListSize must be positive: %s",
250
        softLimitHeaderListSize);
251
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
252
        maxMessageSize);
253

254
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
255
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
256
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
257
    DefaultHttp2RemoteFlowController controller =
1✔
258
        new DefaultHttp2RemoteFlowController(connection, dist);
259
    connection.remote().flowController(controller);
1✔
260
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
261
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
262

263
    if (ticker == null) {
1✔
264
      ticker = Ticker.systemTicker();
×
265
    }
266

267
    RstStreamCounter rstStreamCounter
1✔
268
        = new RstStreamCounter(maxRstCount, maxRstPeriodNanos, ticker);
269
    // Create the local flow controller configured to auto-refill the connection window.
270
    connection.local().flowController(
1✔
271
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
272
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
273
    Http2ConnectionEncoder encoder =
1✔
274
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
275
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
276
    encoder = new Http2RstCounterEncoder(encoder, rstStreamCounter);
1✔
277
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
278
        frameReader);
279

280
    Http2Settings settings = new Http2Settings();
1✔
281
    settings.initialWindowSize(flowControlWindow);
1✔
282
    settings.maxConcurrentStreams(maxStreams);
1✔
283
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
284

285
    return new NettyServerHandler(
1✔
286
        channelUnused,
287
        connection,
288
        transportListener,
289
        streamTracerFactories,
290
        transportTracer,
291
        decoder, encoder, settings,
292
        maxMessageSize,
293
        maxHeaderListSize,
294
        softLimitHeaderListSize,
295
        keepAliveTimeInNanos,
296
        keepAliveTimeoutInNanos,
297
        maxConnectionIdleInNanos,
298
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
299
        keepAliveEnforcer,
300
        autoFlowControl,
301
        rstStreamCounter,
302
        eagAttributes, ticker,
303
        metricRecorder);
304
  }
305

306
  private NettyServerHandler(
307
      ChannelPromise channelUnused,
308
      final Http2Connection connection,
309
      ServerTransportListener transportListener,
310
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
311
      TransportTracer transportTracer,
312
      Http2ConnectionDecoder decoder,
313
      Http2ConnectionEncoder encoder,
314
      Http2Settings settings,
315
      int maxMessageSize,
316
      int maxHeaderListSize,
317
      int softLimitHeaderListSize,
318
      long keepAliveTimeInNanos,
319
      long keepAliveTimeoutInNanos,
320
      long maxConnectionIdleInNanos,
321
      long maxConnectionAgeInNanos,
322
      long maxConnectionAgeGraceInNanos,
323
      final KeepAliveEnforcer keepAliveEnforcer,
324
      boolean autoFlowControl,
325
      RstStreamCounter rstStreamCounter,
326
      Attributes eagAttributes,
327
      Ticker ticker,
328
      MetricRecorder metricRecorder) {
329
    super(
1✔
330
        channelUnused,
331
        decoder,
332
        encoder,
333
        settings,
334
        new ServerChannelLogger(),
335
        autoFlowControl,
336
        null,
337
        ticker,
338
        maxHeaderListSize,
339
        softLimitHeaderListSize);
340

341
    final MaxConnectionIdleManager maxConnectionIdleManager;
342
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
343
      maxConnectionIdleManager = null;
1✔
344
    } else {
345
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
346
    }
347

348
    connection.addListener(new Http2ConnectionAdapter() {
1✔
349
      @Override
350
      public void onStreamActive(Http2Stream stream) {
351
        if (connection.numActiveStreams() == 1) {
1✔
352
          keepAliveEnforcer.onTransportActive();
1✔
353
          if (maxConnectionIdleManager != null) {
1✔
354
            maxConnectionIdleManager.onTransportActive();
1✔
355
          }
356
        }
357
      }
1✔
358

359
      @Override
360
      public void onStreamClosed(Http2Stream stream) {
361
        if (connection.numActiveStreams() == 0) {
1✔
362
          keepAliveEnforcer.onTransportIdle();
1✔
363
          if (maxConnectionIdleManager != null) {
1✔
364
            maxConnectionIdleManager.onTransportIdle();
1✔
365
          }
366
        }
367
      }
1✔
368
    });
369

370
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
371
    this.maxMessageSize = maxMessageSize;
1✔
372
    this.tcpMetrics = new TcpMetrics(metricRecorder);
1✔
373
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
374
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
375
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
376
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
377
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
378
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
379
    this.rstStreamCounter = rstStreamCounter;
1✔
380
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
381

382
    streamKey = encoder.connection().newKey();
1✔
383
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
384
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
385
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
386
    // Set the frame listener on the decoder.
387
    decoder().frameListener(new FrameListener());
1✔
388
  }
1✔
389

390
  @Nullable
391
  Throwable connectionError() {
392
    return connectionError;
1✔
393
  }
394

395
  @Override
396
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
397
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
398

399
    // init max connection age monitor
400
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
401
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
402
          new LogExceptionRunnable(new Runnable() {
1✔
403
            @Override
404
            public void run() {
405
              if (gracefulShutdown == null) {
1✔
406
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
407
                gracefulShutdown.start(ctx);
1✔
408
                ctx.flush();
1✔
409
              }
410
            }
1✔
411
          }),
412
          maxConnectionAgeInNanos,
413
          TimeUnit.NANOSECONDS);
414
    }
415

416
    if (maxConnectionIdleManager != null) {
1✔
417
      maxConnectionIdleManager.start(new Runnable() {
1✔
418
        @Override
419
        public void run() {
420
          if (gracefulShutdown == null) {
1✔
421
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
422
            gracefulShutdown.start(ctx);
1✔
423
            ctx.flush();
1✔
424
          }
425
        }
1✔
426
      }, ctx.executor());
1✔
427
    }
428

429
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
430
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
431
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
432
      keepAliveManager.onTransportStarted();
1✔
433
    }
434

435
    assert encoder().connection().equals(decoder().connection());
1✔
436
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
437

438
    super.handlerAdded(ctx);
1✔
439
  }
1✔
440

441
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
442
      throws Http2Exception {
443
    try {
444
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
445
      // by Netty. RFC 7540 section 8.1.2.2
446
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
447
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
448
        return;
×
449
      }
450

451
      if (headers.authority() == null) {
1✔
452
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
453
        if (hosts.size() > 1) {
1✔
454
          // RFC 7230 section 5.4
455
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
456
              "Multiple host headers");
457
          return;
1✔
458
        }
459
        if (!hosts.isEmpty()) {
1✔
460
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
461
        }
462
      }
463
      headers.remove(HOST);
1✔
464

465
      // Remove the leading slash of the path and get the fully qualified method name
466
      CharSequence path = headers.path();
1✔
467

468
      if (path == null) {
1✔
469
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
470
            "Expected path but is missing");
471
        return;
1✔
472
      }
473

474
      if (path.charAt(0) != '/') {
1✔
475
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
476
            String.format("Expected path to start with /: %s", path));
1✔
477
        return;
1✔
478
      }
479

480
      String method = path.subSequence(1, path.length()).toString();
1✔
481

482
      // Verify that the Content-Type is correct in the request.
483
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
484
      if (contentType == null) {
1✔
485
        respondWithHttpError(
1✔
486
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
487
        return;
1✔
488
      }
489
      String contentTypeString = contentType.toString();
1✔
490
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
491
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
492
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
493
        return;
1✔
494
      }
495

496
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
497
        Http2Headers extraHeaders = new DefaultHttp2Headers();
1✔
498
        extraHeaders.add(HttpHeaderNames.ALLOW, HTTP_METHOD);
1✔
499
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
500
            String.format("Method '%s' is not supported", headers.method()), extraHeaders);
1✔
501
        return;
1✔
502
      }
503

504
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
505
      if (Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
506
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize)) {
507
        respondWithHttpError(ctx, streamId, 431, Status.Code.RESOURCE_EXHAUSTED, String.format(
×
508
                "Client Headers of size %d exceeded Metadata size soft limit: %d",
509
                h2HeadersSize,
×
510
                softLimitHeaderListSize));
×
511
        return;
×
512
      }
513

514
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
515
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
516
                + "some intermediate proxy may not support trailers",
517
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
518
        teWarningLogged = true;
1✔
519
      }
520

521
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
522
      // method.
523
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
524

525
      Metadata metadata = Utils.convertHeaders(headers);
1✔
526
      StatsTraceContext statsTraceCtx =
1✔
527
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
528

529
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
530
          this,
531
          ctx.channel().eventLoop(),
1✔
532
          http2Stream,
533
          maxMessageSize,
534
          statsTraceCtx,
535
          transportTracer,
536
          method);
537

538
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
539
        PerfMark.attachTag(state.tag());
1✔
540
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
541
        NettyServerStream stream = new NettyServerStream(
1✔
542
            ctx.channel(),
1✔
543
            state,
544
            attributes,
545
            authority,
546
            statsTraceCtx);
547
        transportListener.streamCreated(stream, method, metadata);
1✔
548
        state.onStreamAllocated();
1✔
549
        http2Stream.setProperty(streamKey, state);
1✔
550
      }
551
    } catch (Exception e) {
×
552
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
553
      // Throw an exception that will get handled by onStreamError.
554
      throw newStreamException(streamId, e);
×
555
    }
1✔
556
  }
1✔
557

558
  private String getOrUpdateAuthority(AsciiString authority) {
559
    if (authority == null) {
1✔
560
      return null;
1✔
561
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
562
      lastKnownAuthority = authority;
1✔
563
    }
564

565
    // AsciiString.toString() is internally cached, so subsequent calls will not
566
    // result in recomputing the String representation of lastKnownAuthority.
567
    return lastKnownAuthority.toString();
1✔
568
  }
569

570
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
571
      throws Http2Exception {
572
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
573
    try {
574
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
575
      if (stream == null) {
1✔
576
        return;
1✔
577
      }
578
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
579
        PerfMark.attachTag(stream.tag());
1✔
580
        stream.inboundDataReceived(data, endOfStream);
1✔
581
      }
582
    } catch (Throwable e) {
×
583
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
584
      // Throw an exception that will get handled by onStreamError.
585
      throw newStreamException(streamId, e);
×
586
    }
1✔
587
  }
1✔
588

589
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
590
    Http2Exception tooManyRstStream = rstStreamCounter.countRstStream();
1✔
591
    if (tooManyRstStream != null) {
1✔
592
      throw tooManyRstStream;
1✔
593
    }
594

595
    try {
596
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
597
      if (stream != null) {
1✔
598
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
599
          PerfMark.attachTag(stream.tag());
1✔
600
          stream.transportReportStatus(
1✔
601
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
602
        }
603
      }
604
    } catch (Throwable e) {
×
605
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
×
606
      // Throw an exception that will get handled by onStreamError.
607
      throw newStreamException(streamId, e);
×
608
    }
1✔
609
  }
1✔
610

611
  @Override
612
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
613
      Http2Exception http2Ex) {
614
    logger.log(Level.FINE, "Connection Error", cause);
1✔
615
    connectionError = cause;
1✔
616
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
617
  }
1✔
618

619
  @Override
620
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
621
      StreamException http2Ex) {
622
    NettyServerStream.TransportState serverStream = serverStream(
1✔
623
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
624
    Level level = Level.WARNING;
1✔
625
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
626
      level = Level.FINE;
1✔
627
    }
628
    logger.log(level, "Stream Error", cause);
1✔
629
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
630
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
631
      PerfMark.attachTag(tag);
1✔
632
      if (serverStream != null) {
1✔
633
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
634
      }
635
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
636
      // Delegate to the base class to send a RST_STREAM.
637
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
638
    }
639
  }
1✔
640

641
  @Override
642
  public void handleProtocolNegotiationCompleted(
643
      Attributes attrs, InternalChannelz.Security securityInfo) {
644
    negotiationAttributes = attrs;
1✔
645
    this.securityInfo = securityInfo;
1✔
646
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
647
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
648
  }
1✔
649

650
  @Override
651
  public Attributes getEagAttributes() {
652
    return eagAttributes;
1✔
653
  }
654

655
  InternalChannelz.Security getSecurityInfo() {
656
    return securityInfo;
1✔
657
  }
658

659
  @VisibleForTesting
660
  KeepAliveManager getKeepAliveManagerForTest() {
661
    return keepAliveManager;
1✔
662
  }
663

664
  @VisibleForTesting
665
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
666
    this.keepAliveManager = keepAliveManager;
1✔
667
  }
1✔
668

669
  /**
670
   * Handler for the Channel shutting down.
671
   */
672
  @Override
673
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
674
    tcpMetrics.channelActive(ctx.channel());
1✔
675
    super.channelActive(ctx);
1✔
676
  }
1✔
677

678
  @Override
679
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
680
    try {
681
      tcpMetrics.channelInactive(ctx.channel());
1✔
682
      if (keepAliveManager != null) {
1✔
683
        keepAliveManager.onTransportTermination();
1✔
684
      }
685
      if (maxConnectionIdleManager != null) {
1✔
686
        maxConnectionIdleManager.onTransportTermination();
1✔
687
      }
688
      if (maxConnectionAgeMonitor != null) {
1✔
689
        maxConnectionAgeMonitor.cancel(false);
1✔
690
      }
691
      final Status status =
1✔
692
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
693
      // Any streams that are still active must be closed
694
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
695
        @Override
696
        public boolean visit(Http2Stream stream) throws Http2Exception {
697
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
698
          if (serverStream != null) {
1✔
699
            serverStream.transportReportStatus(status);
1✔
700
          }
701
          return true;
1✔
702
        }
703
      });
704
    } finally {
705
      super.channelInactive(ctx);
1✔
706
    }
707
  }
1✔
708

709
  WriteQueue getWriteQueue() {
710
    return serverWriteQueue;
1✔
711
  }
712

713
  /** Handler for commands sent from the stream. */
714
  @Override
715
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
716
      throws Exception {
717
    if (msg instanceof SendGrpcFrameCommand) {
1✔
718
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
719
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
720
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
721
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
722
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
723
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
724
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
725
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
726
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
727
    } else {
728
      AssertionError e =
×
729
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
730
      ReferenceCountUtil.release(msg);
×
731
      promise.setFailure(e);
×
732
      throw e;
×
733
    }
734
  }
1✔
735

736
  @Override
737
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
738
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
739
    ctx.flush();
1✔
740
  }
1✔
741

742
  /**
743
   * Returns the given processed bytes back to inbound flow control.
744
   */
745
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
746
    try {
747
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
748
    } catch (Http2Exception e) {
×
749
      throw new RuntimeException(e);
×
750
    }
1✔
751
  }
1✔
752

753
  private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
754
    promise.addListener(
1✔
755
        new ChannelFutureListener() {
1✔
756
          @Override
757
          public void operationComplete(ChannelFuture future) {
758
            serverStream(stream).complete();
1✔
759
          }
1✔
760
        });
761
  }
1✔
762

763
  private static void streamGone(int streamId, ChannelPromise promise) {
764
    promise.setFailure(
1✔
765
        new IllegalStateException(
766
            "attempting to write to stream " + streamId + " that no longer exists") {
1✔
767
          @Override
768
          public synchronized Throwable fillInStackTrace() {
769
            return this;
1✔
770
          }
771
        });
772
  }
1✔
773

774
  /** Sends the given gRPC frame to the client. */
775
  private void sendGrpcFrame(
776
      ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
777
      throws Http2Exception {
778
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
779
      PerfMark.attachTag(cmd.stream().tag());
1✔
780
      PerfMark.linkIn(cmd.getLink());
1✔
781
      int streamId = cmd.stream().id();
1✔
782
      Http2Stream stream = connection().stream(streamId);
1✔
783
      if (stream == null) {
1✔
784
        cmd.release();
1✔
785
        streamGone(streamId, promise);
1✔
786
        return;
1✔
787
      }
788
      if (cmd.endStream()) {
1✔
789
        closeStreamWhenDone(promise, stream);
×
790
      }
791
      // Call the base class to write the HTTP/2 DATA frame.
792
      encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
1✔
793
    }
794
  }
1✔
795

796
  /**
797
   * Sends the response headers to the client.
798
   */
799
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
800
      ChannelPromise promise) throws Http2Exception {
801
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
802
      PerfMark.attachTag(cmd.stream().tag());
1✔
803
      PerfMark.linkIn(cmd.getLink());
1✔
804
      int streamId = cmd.stream().id();
1✔
805
      Http2Stream stream = connection().stream(streamId);
1✔
806
      if (stream == null) {
1✔
807
        streamGone(streamId, promise);
1✔
808
        return;
1✔
809
      }
810
      if (cmd.endOfStream()) {
1✔
811
        closeStreamWhenDone(promise, stream);
1✔
812
      }
813
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
814
    }
815
  }
1✔
816

817
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
818
      ChannelPromise promise) {
819
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
820
      PerfMark.attachTag(cmd.stream().tag());
1✔
821
      PerfMark.linkIn(cmd.getLink());
1✔
822
      // Notify the listener if we haven't already.
823
      cmd.stream().transportReportStatus(cmd.reason());
1✔
824

825
      // Now we need to decide how we're going to notify the peer that this stream is closed.
826
      // If possible, it's nice to inform the peer _why_ this stream was cancelled by sending
827
      // a structured headers frame.
828
      if (shouldCloseStreamWithHeaders(cmd, connection())) {
1✔
829
        Metadata md = new Metadata();
1✔
830
        md.put(InternalStatus.CODE_KEY, cmd.reason());
1✔
831
        if (cmd.reason().getDescription() != null) {
1✔
832
          md.put(InternalStatus.MESSAGE_KEY, cmd.reason().getDescription());
1✔
833
        }
834
        Http2Headers headers = Utils.convertServerHeaders(md);
1✔
835
        encoder().writeHeaders(
1✔
836
            ctx, cmd.stream().id(), headers, /* padding = */ 0, /* endStream = */ true, promise);
1✔
837
      } else {
1✔
838
        // Terminate the stream.
839
        encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
840
      }
841
    }
842
  }
1✔
843

844
  // Determine whether a CancelServerStreamCommand should try to close the stream with a
845
  // HEADERS or a RST_STREAM frame. The caller has some influence over this (they can
846
  // configure cmd.wantsHeaders()). The state of the stream also has an influence: we
847
  // only try to send HEADERS if the stream exists and hasn't already sent any headers.
848
  private static boolean shouldCloseStreamWithHeaders(
849
          CancelServerStreamCommand cmd, Http2Connection conn) {
850
    if (!cmd.wantsHeaders()) {
1✔
851
      return false;
1✔
852
    }
853
    Http2Stream stream = conn.stream(cmd.stream().id());
1✔
854
    return stream != null && !stream.isHeadersSent();
1✔
855
  }
856

857
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
858
      ChannelPromise promise) throws Exception {
859
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
860
    // requested here. But that's an edge case and seems bug-prone.
861
    if (gracefulShutdown == null) {
1✔
862
      Long graceTimeInNanos = null;
1✔
863
      if (msg.getGraceTimeUnit() != null) {
1✔
864
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
865
      }
866
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
867
      gracefulShutdown.start(ctx);
1✔
868
    }
869
    promise.setSuccess();
1✔
870
  }
1✔
871

872
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
873
      ChannelPromise promise) throws Exception {
874
    super.close(ctx, promise);
1✔
875
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
876
      @Override
877
      public boolean visit(Http2Stream stream) throws Http2Exception {
878
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
879
        if (serverStream != null) {
1✔
880
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
881
            PerfMark.attachTag(serverStream.tag());
1✔
882
            PerfMark.linkIn(msg.getLink());
1✔
883
            serverStream.transportReportStatus(msg.getStatus());
1✔
884
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
885
          }
886
        }
887
        stream.close();
1✔
888
        return true;
1✔
889
      }
890
    });
891
  }
1✔
892

893
  private void respondWithHttpError(
894
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
895
    respondWithHttpError(ctx, streamId, code, statusCode, msg, EmptyHttp2Headers.INSTANCE);
1✔
896
  }
1✔
897

898
  private void respondWithHttpError(
899
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg,
900
      Http2Headers extraHeaders) {
901
    Metadata metadata = new Metadata();
1✔
902
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
903
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
904
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
905

906
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
907
        .status("" + code)
1✔
908
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
909
    for (int i = 0; i < serialized.length; i += 2) {
1✔
910
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
911
    }
912
    headers.add(extraHeaders);
1✔
913
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
914
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
915
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
916
  }
1✔
917

918
  private Http2Stream requireHttp2Stream(int streamId) {
919
    Http2Stream stream = connection().stream(streamId);
1✔
920
    if (stream == null) {
1✔
921
      // This should never happen.
922
      throw new AssertionError("Stream does not exist: " + streamId);
×
923
    }
924
    return stream;
1✔
925
  }
926

927
  /**
928
   * Returns the server stream associated to the given HTTP/2 stream object.
929
   */
930
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
931
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
932
  }
933

934
  private Http2Exception newStreamException(int streamId, Throwable cause) {
935
    return Http2Exception.streamError(
×
936
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
×
937
  }
938

939
  private class FrameListener extends Http2FrameAdapter {
1✔
940
    private boolean firstSettings = true;
1✔
941

942
    @Override
943
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
944
      if (firstSettings) {
1✔
945
        firstSettings = false;
1✔
946
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
947
        // handshakeTimeout
948
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
949
      }
950
    }
1✔
951

952
    @Override
953
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
954
        boolean endOfStream) throws Http2Exception {
955
      if (keepAliveManager != null) {
1✔
956
        keepAliveManager.onDataReceived();
1✔
957
      }
958
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
959
      return padding;
1✔
960
    }
961

962
    @Override
963
    public void onHeadersRead(ChannelHandlerContext ctx,
964
        int streamId,
965
        Http2Headers headers,
966
        int streamDependency,
967
        short weight,
968
        boolean exclusive,
969
        int padding,
970
        boolean endStream) throws Http2Exception {
971
      if (keepAliveManager != null) {
1✔
972
        keepAliveManager.onDataReceived();
1✔
973
      }
974
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
975
      if (endStream) {
1✔
976
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
977
      }
978
    }
1✔
979

980
    @Override
981
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
982
        throws Http2Exception {
983
      if (keepAliveManager != null) {
1✔
984
        keepAliveManager.onDataReceived();
1✔
985
      }
986
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
987
    }
1✔
988

989
    @Override
990
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
991
      if (keepAliveManager != null) {
1✔
992
        keepAliveManager.onDataReceived();
1✔
993
      }
994
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
995
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
996
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
997
            debugData, ctx.newPromise());
1✔
998
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
999
        try {
1000
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
1001
        } catch (Exception ex) {
×
1002
          onError(ctx, /* outbound= */ true, ex);
×
1003
        }
1✔
1004
      }
1005
    }
1✔
1006

1007
    @Override
1008
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
1009
      if (keepAliveManager != null) {
1✔
1010
        keepAliveManager.onDataReceived();
1✔
1011
      }
1012
      if (data == flowControlPing().payload()) {
1✔
1013
        flowControlPing().updateWindow();
1✔
1014
        logger.log(Level.FINE, "Window: {0}",
1✔
1015
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1016
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
1017
        if (gracefulShutdown == null) {
1✔
1018
          // this should never happen
1019
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
1020
        } else {
1021
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
1022
        }
1023
      } else if (data != KEEPALIVE_PING) {
1✔
1024
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1025
      }
1026
    }
1✔
1027
  }
1028

1029
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1030
    final ChannelHandlerContext ctx;
1031

1032
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
1033
      this.ctx = ctx;
1✔
1034
    }
1✔
1035

1036
    @Override
1037
    public void ping() {
1038
      ChannelFuture pingFuture = encoder().writePing(
1✔
1039
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
1040
      ctx.flush();
1✔
1041
      pingFuture.addListener(new ChannelFutureListener() {
1✔
1042
        @Override
1043
        public void operationComplete(ChannelFuture future) throws Exception {
1044
          if (future.isSuccess()) {
1✔
1045
            transportTracer.reportKeepAliveSent();
1✔
1046
          }
1047
        }
1✔
1048
      });
1049
    }
1✔
1050

1051
    @Override
1052
    public void onPingTimeout() {
1053
      try {
1054
        forcefulClose(
1✔
1055
            ctx,
1056
            new ForcefulCloseCommand(Status.UNAVAILABLE
1057
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
1058
            ctx.newPromise());
1✔
1059
      } catch (Exception ex) {
×
1060
        try {
1061
          exceptionCaught(ctx, ex);
×
1062
        } catch (Exception ex2) {
×
1063
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
1064
          logger.log(Level.WARNING, "Original failure", ex);
×
1065
        }
×
1066
      }
1✔
1067
    }
1✔
1068
  }
1069

1070
  private final class GracefulShutdown {
1071
    String goAwayMessage;
1072

1073
    /**
1074
     * The grace time between starting graceful shutdown and closing the netty channel,
1075
     * {@code null} is unspecified.
1076
     */
1077
    @CheckForNull
1078
    Long graceTimeInNanos;
1079

1080
    /**
1081
     * True if ping is Acked or ping is timeout.
1082
     */
1083
    boolean pingAckedOrTimeout;
1084

1085
    Future<?> pingFuture;
1086

1087
    GracefulShutdown(String goAwayMessage,
1088
        @Nullable Long graceTimeInNanos) {
1✔
1089
      this.goAwayMessage = goAwayMessage;
1✔
1090
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1091
    }
1✔
1092

1093
    /**
1094
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1095
     */
1096
    void start(final ChannelHandlerContext ctx) {
1097
      goAway(
1✔
1098
          ctx,
1099
          Integer.MAX_VALUE,
1100
          Http2Error.NO_ERROR.code(),
1✔
1101
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1102
          ctx.newPromise());
1✔
1103

1104
      pingFuture = ctx.executor().schedule(
1✔
1105
          new Runnable() {
1✔
1106
            @Override
1107
            public void run() {
1108
              secondGoAwayAndClose(ctx);
1✔
1109
            }
1✔
1110
          },
1111
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1112
          TimeUnit.NANOSECONDS);
1113

1114
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1115
    }
1✔
1116

1117
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1118
      if (pingAckedOrTimeout) {
1✔
1119
        return;
×
1120
      }
1121
      pingAckedOrTimeout = true;
1✔
1122

1123
      checkNotNull(pingFuture, "pingFuture");
1✔
1124
      pingFuture.cancel(false);
1✔
1125

1126
      // send the second GOAWAY with last stream id
1127
      goAway(
1✔
1128
          ctx,
1129
          connection().remote().lastStreamCreated(),
1✔
1130
          Http2Error.NO_ERROR.code(),
1✔
1131
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1132
          ctx.newPromise());
1✔
1133

1134
      // gracefully shutdown with specified grace time
1135
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1136
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1137
      try {
1138
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1139
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1140
      } catch (Exception e) {
×
1141
        onError(ctx, /* outbound= */ true, e);
×
1142
      } finally {
1143
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1144
      }
1145
    }
1✔
1146

1147
    private long graceTimeOverrideMillis(long originalMillis) {
1148
      if (graceTimeInNanos == null) {
1✔
1149
        return originalMillis;
1✔
1150
      }
1151
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1152
        // netty treats -1 as "no timeout"
1153
        return -1L;
1✔
1154
      }
1155
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1156
    }
1157
  }
1158

1159
  // Use a frame writer so that we know when frames are through flow control and actually being
1160
  // written.
1161
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1162
    private final KeepAliveEnforcer keepAliveEnforcer;
1163

1164
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1165
        KeepAliveEnforcer keepAliveEnforcer) {
1166
      super(delegate);
1✔
1167
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1168
    }
1✔
1169

1170
    @Override
1171
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1172
        int padding, boolean endStream, ChannelPromise promise) {
1173
      keepAliveEnforcer.resetCounters();
1✔
1174
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1175
    }
1176

1177
    @Override
1178
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1179
        int padding, boolean endStream, ChannelPromise promise) {
1180
      keepAliveEnforcer.resetCounters();
1✔
1181
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1182
    }
1183

1184
    @Override
1185
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1186
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1187
        ChannelPromise promise) {
1188
      keepAliveEnforcer.resetCounters();
×
1189
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1190
          padding, endStream, promise);
1191
    }
1192
  }
1193

1194
  private static final class Http2RstCounterEncoder extends DecoratingHttp2ConnectionEncoder {
1195
    private final RstStreamCounter rstStreamCounter;
1196
    private Http2LifecycleManager lifecycleManager;
1197

1198
    Http2RstCounterEncoder(Http2ConnectionEncoder encoder, RstStreamCounter rstStreamCounter) {
1199
      super(encoder);
1✔
1200
      this.rstStreamCounter = rstStreamCounter;
1✔
1201
    }
1✔
1202

1203
    @Override
1204
    public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
1205
      this.lifecycleManager = lifecycleManager;
1✔
1206
      super.lifecycleManager(lifecycleManager);
1✔
1207
    }
1✔
1208

1209
    @Override
1210
    public ChannelFuture writeRstStream(
1211
        ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) {
1212
      ChannelFuture future = super.writeRstStream(ctx, streamId, errorCode, promise);
1✔
1213
      // We want to count "induced" RST_STREAM, where the server sent a reset because of a malformed
1214
      // frame.
1215
      boolean normalRst
1✔
1216
          = errorCode == Http2Error.NO_ERROR.code() || errorCode == Http2Error.CANCEL.code();
1✔
1217
      if (!normalRst) {
1✔
1218
        Http2Exception tooManyRstStream = rstStreamCounter.countRstStream();
1✔
1219
        if (tooManyRstStream != null) {
1✔
1220
          lifecycleManager.onError(ctx, true, tooManyRstStream);
1✔
1221
          ctx.close();
1✔
1222
        }
1223
      }
1224
      return future;
1✔
1225
    }
1226
  }
1227

1228
  private static final class RstStreamCounter {
1229
    private final int maxRstCount;
1230
    private final long maxRstPeriodNanos;
1231
    private final Ticker ticker;
1232
    private int rstCount;
1233
    private long lastRstNanoTime;
1234

1235
    RstStreamCounter(int maxRstCount, long maxRstPeriodNanos, Ticker ticker) {
1✔
1236
      checkArgument(maxRstCount >= 0, "maxRstCount must be non-negative: %s", maxRstCount);
1✔
1237
      this.maxRstCount = maxRstCount;
1✔
1238
      this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
1239
      this.ticker = checkNotNull(ticker, "ticker");
1✔
1240
      this.lastRstNanoTime = ticker.read();
1✔
1241
    }
1✔
1242

1243
    /** Returns non-{@code null} when the connection should be killed by the caller. */
1244
    private Http2Exception countRstStream() {
1245
      if (maxRstCount == 0) {
1✔
1246
        return null;
1✔
1247
      }
1248
      long now = ticker.read();
1✔
1249
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
1250
        lastRstNanoTime = now;
1✔
1251
        rstCount = 1;
1✔
1252
      } else {
1253
        rstCount++;
1✔
1254
        if (rstCount > maxRstCount) {
1✔
1255
          return new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
1256
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
1257
            @Override
1258
            public Throwable fillInStackTrace() {
1259
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
1260
              return this;
1✔
1261
            }
1262
          };
1263
        }
1264
      }
1265
      return null;
1✔
1266
    }
1267
  }
1268

1269
  private static class ServerChannelLogger extends ChannelLogger {
1270
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1271

1272
    @Override
1273
    public void log(ChannelLogLevel level, String message) {
1274
      log.log(toJavaLogLevel(level), message);
1✔
1275
    }
1✔
1276

1277
    @Override
1278
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1279
      log(level, MessageFormat.format(messageFormat, args));
1✔
1280
    }
1✔
1281
  }
1282

1283
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1284
    switch (level) {
1✔
1285
      case ERROR:
1286
        return Level.FINE;
×
1287
      case WARNING:
1288
        return Level.FINER;
×
1289
      default:
1290
        return Level.FINEST;
1✔
1291
    }
1292
  }
1293
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc