• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19970

29 Aug 2025 09:01PM UTC coverage: 88.532% (-0.06%) from 88.588%
#19970

push

github

web-flow
Upgrade guava version to 33.4.8 (#12219)

Guava seems to call a deprecated sun.misc.Unsafe::objectFieldOffset
method which might be removed in a future JDK release.
There are still a few things to do for -android versions, which are
tracked in https://github.com/google/guava/issues/7742,
see details at google/guava#7811

Fixes #12215

34678 of 39170 relevant lines covered (88.53%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.82
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http2.DecoratingHttp2ConnectionEncoder;
64
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
65
import io.netty.handler.codec.http2.DefaultHttp2Connection;
66
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
67
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
68
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
69
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
70
import io.netty.handler.codec.http2.DefaultHttp2Headers;
71
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
72
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
73
import io.netty.handler.codec.http2.Http2Connection;
74
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
75
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
76
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
77
import io.netty.handler.codec.http2.Http2Error;
78
import io.netty.handler.codec.http2.Http2Exception;
79
import io.netty.handler.codec.http2.Http2Exception.StreamException;
80
import io.netty.handler.codec.http2.Http2FrameAdapter;
81
import io.netty.handler.codec.http2.Http2FrameLogger;
82
import io.netty.handler.codec.http2.Http2FrameReader;
83
import io.netty.handler.codec.http2.Http2FrameWriter;
84
import io.netty.handler.codec.http2.Http2Headers;
85
import io.netty.handler.codec.http2.Http2HeadersDecoder;
86
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
87
import io.netty.handler.codec.http2.Http2LifecycleManager;
88
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
89
import io.netty.handler.codec.http2.Http2Settings;
90
import io.netty.handler.codec.http2.Http2Stream;
91
import io.netty.handler.codec.http2.Http2StreamVisitor;
92
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
93
import io.netty.handler.logging.LogLevel;
94
import io.netty.util.AsciiString;
95
import io.netty.util.ReferenceCountUtil;
96
import io.perfmark.PerfMark;
97
import io.perfmark.Tag;
98
import io.perfmark.TaskCloseable;
99
import java.text.MessageFormat;
100
import java.util.List;
101
import java.util.concurrent.Future;
102
import java.util.concurrent.ScheduledFuture;
103
import java.util.concurrent.TimeUnit;
104
import java.util.logging.Level;
105
import java.util.logging.Logger;
106
import javax.annotation.CheckForNull;
107
import javax.annotation.Nullable;
108

109
/**
110
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
111
 * the context of the Netty Channel thread.
112
 */
113
class NettyServerHandler extends AbstractNettyHandler {
114
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
115
  private static final long KEEPALIVE_PING = 0xDEADL;
116
  @VisibleForTesting
117
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
118
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
119
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
120
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
121
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
122

123
  private final Http2Connection.PropertyKey streamKey;
124
  private final ServerTransportListener transportListener;
125
  private final int maxMessageSize;
126
  private final long keepAliveTimeInNanos;
127
  private final long keepAliveTimeoutInNanos;
128
  private final long maxConnectionAgeInNanos;
129
  private final long maxConnectionAgeGraceInNanos;
130
  private final RstStreamCounter rstStreamCounter;
131
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
132
  private final TransportTracer transportTracer;
133
  private final KeepAliveEnforcer keepAliveEnforcer;
134
  private final Attributes eagAttributes;
135
  /** Incomplete attributes produced by negotiator. */
136
  private Attributes negotiationAttributes;
137
  private InternalChannelz.Security securityInfo;
138
  /** Completed attributes produced by transportReady. */
139
  private Attributes attributes;
140
  private Throwable connectionError;
141
  private boolean teWarningLogged;
142
  private WriteQueue serverWriteQueue;
143
  private AsciiString lastKnownAuthority;
144
  @CheckForNull
145
  private KeepAliveManager keepAliveManager;
146
  @CheckForNull
147
  private MaxConnectionIdleManager maxConnectionIdleManager;
148
  @CheckForNull
149
  private ScheduledFuture<?> maxConnectionAgeMonitor;
150
  @CheckForNull
151
  private GracefulShutdown gracefulShutdown;
152

153
  static NettyServerHandler newHandler(
154
      ServerTransportListener transportListener,
155
      ChannelPromise channelUnused,
156
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
157
      TransportTracer transportTracer,
158
      int maxStreams,
159
      boolean autoFlowControl,
160
      int flowControlWindow,
161
      int maxHeaderListSize,
162
      int softLimitHeaderListSize,
163
      int maxMessageSize,
164
      long keepAliveTimeInNanos,
165
      long keepAliveTimeoutInNanos,
166
      long maxConnectionIdleInNanos,
167
      long maxConnectionAgeInNanos,
168
      long maxConnectionAgeGraceInNanos,
169
      boolean permitKeepAliveWithoutCalls,
170
      long permitKeepAliveTimeInNanos,
171
      int maxRstCount,
172
      long maxRstPeriodNanos,
173
      Attributes eagAttributes) {
174
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
175
        maxHeaderListSize);
176
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
177
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
178
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
179
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
180
    Http2FrameWriter frameWriter =
1✔
181
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
182
    return newHandler(
1✔
183
        channelUnused,
184
        frameReader,
185
        frameWriter,
186
        transportListener,
187
        streamTracerFactories,
188
        transportTracer,
189
        maxStreams,
190
        autoFlowControl,
191
        flowControlWindow,
192
        maxHeaderListSize,
193
        softLimitHeaderListSize,
194
        maxMessageSize,
195
        keepAliveTimeInNanos,
196
        keepAliveTimeoutInNanos,
197
        maxConnectionIdleInNanos,
198
        maxConnectionAgeInNanos,
199
        maxConnectionAgeGraceInNanos,
200
        permitKeepAliveWithoutCalls,
201
        permitKeepAliveTimeInNanos,
202
        maxRstCount,
203
        maxRstPeriodNanos,
204
        eagAttributes,
205
        Ticker.systemTicker());
1✔
206
  }
207

208
  static NettyServerHandler newHandler(
209
      ChannelPromise channelUnused,
210
      Http2FrameReader frameReader,
211
      Http2FrameWriter frameWriter,
212
      ServerTransportListener transportListener,
213
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
214
      TransportTracer transportTracer,
215
      int maxStreams,
216
      boolean autoFlowControl,
217
      int flowControlWindow,
218
      int maxHeaderListSize,
219
      int softLimitHeaderListSize,
220
      int maxMessageSize,
221
      long keepAliveTimeInNanos,
222
      long keepAliveTimeoutInNanos,
223
      long maxConnectionIdleInNanos,
224
      long maxConnectionAgeInNanos,
225
      long maxConnectionAgeGraceInNanos,
226
      boolean permitKeepAliveWithoutCalls,
227
      long permitKeepAliveTimeInNanos,
228
      int maxRstCount,
229
      long maxRstPeriodNanos,
230
      Attributes eagAttributes,
231
      Ticker ticker) {
232
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
233
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
234
        flowControlWindow);
235
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
236
        maxHeaderListSize);
237
    Preconditions.checkArgument(
1✔
238
        softLimitHeaderListSize > 0, "softLimitHeaderListSize must be positive: %s",
239
        softLimitHeaderListSize);
240
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
241
        maxMessageSize);
242

243
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
244
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
245
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
246
    DefaultHttp2RemoteFlowController controller =
1✔
247
        new DefaultHttp2RemoteFlowController(connection, dist);
248
    connection.remote().flowController(controller);
1✔
249
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
250
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
251

252
    if (ticker == null) {
1✔
253
      ticker = Ticker.systemTicker();
×
254
    }
255

256
    RstStreamCounter rstStreamCounter
1✔
257
        = new RstStreamCounter(maxRstCount, maxRstPeriodNanos, ticker);
258
    // Create the local flow controller configured to auto-refill the connection window.
259
    connection.local().flowController(
1✔
260
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
261
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
262
    Http2ConnectionEncoder encoder =
1✔
263
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
264
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
265
    encoder = new Http2RstCounterEncoder(encoder, rstStreamCounter);
1✔
266
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
267
        frameReader);
268

269
    Http2Settings settings = new Http2Settings();
1✔
270
    settings.initialWindowSize(flowControlWindow);
1✔
271
    settings.maxConcurrentStreams(maxStreams);
1✔
272
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
273

274
    return new NettyServerHandler(
1✔
275
        channelUnused,
276
        connection,
277
        transportListener,
278
        streamTracerFactories,
279
        transportTracer,
280
        decoder, encoder, settings,
281
        maxMessageSize,
282
        maxHeaderListSize,
283
        softLimitHeaderListSize,
284
        keepAliveTimeInNanos,
285
        keepAliveTimeoutInNanos,
286
        maxConnectionIdleInNanos,
287
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
288
        keepAliveEnforcer,
289
        autoFlowControl,
290
        rstStreamCounter,
291
        eagAttributes, ticker);
292
  }
293

294
  private NettyServerHandler(
295
      ChannelPromise channelUnused,
296
      final Http2Connection connection,
297
      ServerTransportListener transportListener,
298
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
299
      TransportTracer transportTracer,
300
      Http2ConnectionDecoder decoder,
301
      Http2ConnectionEncoder encoder,
302
      Http2Settings settings,
303
      int maxMessageSize,
304
      int maxHeaderListSize,
305
      int softLimitHeaderListSize,
306
      long keepAliveTimeInNanos,
307
      long keepAliveTimeoutInNanos,
308
      long maxConnectionIdleInNanos,
309
      long maxConnectionAgeInNanos,
310
      long maxConnectionAgeGraceInNanos,
311
      final KeepAliveEnforcer keepAliveEnforcer,
312
      boolean autoFlowControl,
313
      RstStreamCounter rstStreamCounter,
314
      Attributes eagAttributes,
315
      Ticker ticker) {
316
    super(
1✔
317
        channelUnused,
318
        decoder,
319
        encoder,
320
        settings,
321
        new ServerChannelLogger(),
322
        autoFlowControl,
323
        null,
324
        ticker,
325
        maxHeaderListSize,
326
        softLimitHeaderListSize);
327

328
    final MaxConnectionIdleManager maxConnectionIdleManager;
329
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
330
      maxConnectionIdleManager = null;
1✔
331
    } else {
332
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
333
    }
334

335
    connection.addListener(new Http2ConnectionAdapter() {
1✔
336
      @Override
337
      public void onStreamActive(Http2Stream stream) {
338
        if (connection.numActiveStreams() == 1) {
1✔
339
          keepAliveEnforcer.onTransportActive();
1✔
340
          if (maxConnectionIdleManager != null) {
1✔
341
            maxConnectionIdleManager.onTransportActive();
1✔
342
          }
343
        }
344
      }
1✔
345

346
      @Override
347
      public void onStreamClosed(Http2Stream stream) {
348
        if (connection.numActiveStreams() == 0) {
1✔
349
          keepAliveEnforcer.onTransportIdle();
1✔
350
          if (maxConnectionIdleManager != null) {
1✔
351
            maxConnectionIdleManager.onTransportIdle();
1✔
352
          }
353
        }
354
      }
1✔
355
    });
356

357
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
358
    this.maxMessageSize = maxMessageSize;
1✔
359
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
360
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
361
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
362
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
363
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
364
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
365
    this.rstStreamCounter = rstStreamCounter;
1✔
366
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
367

368
    streamKey = encoder.connection().newKey();
1✔
369
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
370
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
371
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
372
    // Set the frame listener on the decoder.
373
    decoder().frameListener(new FrameListener());
1✔
374
  }
1✔
375

376
  @Nullable
377
  Throwable connectionError() {
378
    return connectionError;
1✔
379
  }
380

381
  @Override
382
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
383
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
384

385
    // init max connection age monitor
386
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
387
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
388
          new LogExceptionRunnable(new Runnable() {
1✔
389
            @Override
390
            public void run() {
391
              if (gracefulShutdown == null) {
1✔
392
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
393
                gracefulShutdown.start(ctx);
1✔
394
                ctx.flush();
1✔
395
              }
396
            }
1✔
397
          }),
398
          maxConnectionAgeInNanos,
399
          TimeUnit.NANOSECONDS);
400
    }
401

402
    if (maxConnectionIdleManager != null) {
1✔
403
      maxConnectionIdleManager.start(new Runnable() {
1✔
404
        @Override
405
        public void run() {
406
          if (gracefulShutdown == null) {
1✔
407
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
408
            gracefulShutdown.start(ctx);
1✔
409
            ctx.flush();
1✔
410
          }
411
        }
1✔
412
      }, ctx.executor());
1✔
413
    }
414

415
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
416
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
417
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
418
      keepAliveManager.onTransportStarted();
1✔
419
    }
420

421
    assert encoder().connection().equals(decoder().connection());
1✔
422
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
423

424
    super.handlerAdded(ctx);
1✔
425
  }
1✔
426

427
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
428
      throws Http2Exception {
429
    try {
430
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
431
      // by Netty. RFC 7540 section 8.1.2.2
432
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
433
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
434
        return;
×
435
      }
436

437
      if (headers.authority() == null) {
1✔
438
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
439
        if (hosts.size() > 1) {
1✔
440
          // RFC 7230 section 5.4
441
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
442
              "Multiple host headers");
443
          return;
1✔
444
        }
445
        if (!hosts.isEmpty()) {
1✔
446
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
447
        }
448
      }
449
      headers.remove(HOST);
1✔
450

451
      // Remove the leading slash of the path and get the fully qualified method name
452
      CharSequence path = headers.path();
1✔
453

454
      if (path == null) {
1✔
455
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
456
            "Expected path but is missing");
457
        return;
1✔
458
      }
459

460
      if (path.charAt(0) != '/') {
1✔
461
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
462
            String.format("Expected path to start with /: %s", path));
1✔
463
        return;
1✔
464
      }
465

466
      String method = path.subSequence(1, path.length()).toString();
1✔
467

468
      // Verify that the Content-Type is correct in the request.
469
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
470
      if (contentType == null) {
1✔
471
        respondWithHttpError(
1✔
472
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
473
        return;
1✔
474
      }
475
      String contentTypeString = contentType.toString();
1✔
476
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
477
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
478
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
479
        return;
1✔
480
      }
481

482
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
483
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
484
            String.format("Method '%s' is not supported", headers.method()));
1✔
485
        return;
1✔
486
      }
487

488
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
489
      if (Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
490
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize)) {
491
        respondWithHttpError(ctx, streamId, 431, Status.Code.RESOURCE_EXHAUSTED, String.format(
×
492
                "Client Headers of size %d exceeded Metadata size soft limit: %d",
493
                h2HeadersSize,
×
494
                softLimitHeaderListSize));
×
495
        return;
×
496
      }
497

498
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
499
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
500
                + "some intermediate proxy may not support trailers",
501
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
502
        teWarningLogged = true;
1✔
503
      }
504

505
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
506
      // method.
507
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
508

509
      Metadata metadata = Utils.convertHeaders(headers);
1✔
510
      StatsTraceContext statsTraceCtx =
1✔
511
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
512

513
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
514
          this,
515
          ctx.channel().eventLoop(),
1✔
516
          http2Stream,
517
          maxMessageSize,
518
          statsTraceCtx,
519
          transportTracer,
520
          method);
521

522
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
523
        PerfMark.attachTag(state.tag());
1✔
524
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
525
        NettyServerStream stream = new NettyServerStream(
1✔
526
            ctx.channel(),
1✔
527
            state,
528
            attributes,
529
            authority,
530
            statsTraceCtx);
531
        transportListener.streamCreated(stream, method, metadata);
1✔
532
        state.onStreamAllocated();
1✔
533
        http2Stream.setProperty(streamKey, state);
1✔
534
      }
535
    } catch (Exception e) {
×
536
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
537
      // Throw an exception that will get handled by onStreamError.
538
      throw newStreamException(streamId, e);
×
539
    }
1✔
540
  }
1✔
541

542
  private String getOrUpdateAuthority(AsciiString authority) {
543
    if (authority == null) {
1✔
544
      return null;
1✔
545
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
546
      lastKnownAuthority = authority;
1✔
547
    }
548

549
    // AsciiString.toString() is internally cached, so subsequent calls will not
550
    // result in recomputing the String representation of lastKnownAuthority.
551
    return lastKnownAuthority.toString();
1✔
552
  }
553

554
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
555
      throws Http2Exception {
556
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
557
    try {
558
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
559
      if (stream == null) {
1✔
560
        return;
1✔
561
      }
562
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
563
        PerfMark.attachTag(stream.tag());
1✔
564
        stream.inboundDataReceived(data, endOfStream);
1✔
565
      }
566
    } catch (Throwable e) {
×
567
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
568
      // Throw an exception that will get handled by onStreamError.
569
      throw newStreamException(streamId, e);
×
570
    }
1✔
571
  }
1✔
572

573
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
574
    Http2Exception tooManyRstStream = rstStreamCounter.countRstStream();
1✔
575
    if (tooManyRstStream != null) {
1✔
576
      throw tooManyRstStream;
1✔
577
    }
578

579
    try {
580
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
581
      if (stream != null) {
1✔
582
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
583
          PerfMark.attachTag(stream.tag());
1✔
584
          stream.transportReportStatus(
1✔
585
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
586
        }
587
      }
588
    } catch (Throwable e) {
×
589
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
×
590
      // Throw an exception that will get handled by onStreamError.
591
      throw newStreamException(streamId, e);
×
592
    }
1✔
593
  }
1✔
594

595
  @Override
596
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
597
      Http2Exception http2Ex) {
598
    logger.log(Level.FINE, "Connection Error", cause);
1✔
599
    connectionError = cause;
1✔
600
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
601
  }
1✔
602

603
  @Override
604
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
605
      StreamException http2Ex) {
606
    NettyServerStream.TransportState serverStream = serverStream(
1✔
607
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
608
    Level level = Level.WARNING;
1✔
609
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
610
      level = Level.FINE;
×
611
    }
612
    logger.log(level, "Stream Error", cause);
1✔
613
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
614
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
615
      PerfMark.attachTag(tag);
1✔
616
      if (serverStream != null) {
1✔
617
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
618
      }
619
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
620
      // Delegate to the base class to send a RST_STREAM.
621
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
622
    }
623
  }
1✔
624

625
  @Override
626
  public void handleProtocolNegotiationCompleted(
627
      Attributes attrs, InternalChannelz.Security securityInfo) {
628
    negotiationAttributes = attrs;
1✔
629
    this.securityInfo = securityInfo;
1✔
630
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
631
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
632
  }
1✔
633

634
  @Override
635
  public Attributes getEagAttributes() {
636
    return eagAttributes;
1✔
637
  }
638

639
  InternalChannelz.Security getSecurityInfo() {
640
    return securityInfo;
1✔
641
  }
642

643
  @VisibleForTesting
644
  KeepAliveManager getKeepAliveManagerForTest() {
645
    return keepAliveManager;
1✔
646
  }
647

648
  @VisibleForTesting
649
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
650
    this.keepAliveManager = keepAliveManager;
1✔
651
  }
1✔
652

653
  /**
654
   * Handler for the Channel shutting down.
655
   */
656
  @Override
657
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
658
    try {
659
      if (keepAliveManager != null) {
1✔
660
        keepAliveManager.onTransportTermination();
1✔
661
      }
662
      if (maxConnectionIdleManager != null) {
1✔
663
        maxConnectionIdleManager.onTransportTermination();
1✔
664
      }
665
      if (maxConnectionAgeMonitor != null) {
1✔
666
        maxConnectionAgeMonitor.cancel(false);
1✔
667
      }
668
      final Status status =
1✔
669
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
670
      // Any streams that are still active must be closed
671
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
672
        @Override
673
        public boolean visit(Http2Stream stream) throws Http2Exception {
674
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
675
          if (serverStream != null) {
1✔
676
            serverStream.transportReportStatus(status);
1✔
677
          }
678
          return true;
1✔
679
        }
680
      });
681
    } finally {
682
      super.channelInactive(ctx);
1✔
683
    }
684
  }
1✔
685

686
  WriteQueue getWriteQueue() {
687
    return serverWriteQueue;
1✔
688
  }
689

690
  /** Handler for commands sent from the stream. */
691
  @Override
692
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
693
      throws Exception {
694
    if (msg instanceof SendGrpcFrameCommand) {
1✔
695
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
696
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
697
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
698
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
699
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
700
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
701
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
702
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
703
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
704
    } else {
705
      AssertionError e =
×
706
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
707
      ReferenceCountUtil.release(msg);
×
708
      promise.setFailure(e);
×
709
      throw e;
×
710
    }
711
  }
1✔
712

713
  @Override
714
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
715
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
716
    ctx.flush();
1✔
717
  }
1✔
718

719
  /**
720
   * Returns the given processed bytes back to inbound flow control.
721
   */
722
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
723
    try {
724
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
725
    } catch (Http2Exception e) {
×
726
      throw new RuntimeException(e);
×
727
    }
1✔
728
  }
1✔
729

730
  private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
731
    promise.addListener(
1✔
732
        new ChannelFutureListener() {
1✔
733
          @Override
734
          public void operationComplete(ChannelFuture future) {
735
            serverStream(stream).complete();
1✔
736
          }
1✔
737
        });
738
  }
1✔
739

740
  private static void streamGone(int streamId, ChannelPromise promise) {
741
    promise.setFailure(
1✔
742
        new IllegalStateException(
743
            "attempting to write to stream " + streamId + " that no longer exists") {
1✔
744
          @Override
745
          public synchronized Throwable fillInStackTrace() {
746
            return this;
1✔
747
          }
748
        });
749
  }
1✔
750

751
  /** Sends the given gRPC frame to the client. */
752
  private void sendGrpcFrame(
753
      ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
754
      throws Http2Exception {
755
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
756
      PerfMark.attachTag(cmd.stream().tag());
1✔
757
      PerfMark.linkIn(cmd.getLink());
1✔
758
      int streamId = cmd.stream().id();
1✔
759
      Http2Stream stream = connection().stream(streamId);
1✔
760
      if (stream == null) {
1✔
761
        cmd.release();
1✔
762
        streamGone(streamId, promise);
1✔
763
        return;
1✔
764
      }
765
      if (cmd.endStream()) {
1✔
766
        closeStreamWhenDone(promise, stream);
×
767
      }
768
      // Call the base class to write the HTTP/2 DATA frame.
769
      encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
1✔
770
    }
771
  }
1✔
772

773
  /**
774
   * Sends the response headers to the client.
775
   */
776
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
777
      ChannelPromise promise) throws Http2Exception {
778
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
779
      PerfMark.attachTag(cmd.stream().tag());
1✔
780
      PerfMark.linkIn(cmd.getLink());
1✔
781
      int streamId = cmd.stream().id();
1✔
782
      Http2Stream stream = connection().stream(streamId);
1✔
783
      if (stream == null) {
1✔
784
        streamGone(streamId, promise);
1✔
785
        return;
1✔
786
      }
787
      if (cmd.endOfStream()) {
1✔
788
        closeStreamWhenDone(promise, stream);
1✔
789
      }
790
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
791
    }
792
  }
1✔
793

794
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
795
      ChannelPromise promise) {
796
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
797
      PerfMark.attachTag(cmd.stream().tag());
1✔
798
      PerfMark.linkIn(cmd.getLink());
1✔
799
      // Notify the listener if we haven't already.
800
      cmd.stream().transportReportStatus(cmd.reason());
1✔
801

802
      // Now we need to decide how we're going to notify the peer that this stream is closed.
803
      // If possible, it's nice to inform the peer _why_ this stream was cancelled by sending
804
      // a structured headers frame.
805
      if (shouldCloseStreamWithHeaders(cmd, connection())) {
1✔
806
        Metadata md = new Metadata();
1✔
807
        md.put(InternalStatus.CODE_KEY, cmd.reason());
1✔
808
        if (cmd.reason().getDescription() != null) {
1✔
809
          md.put(InternalStatus.MESSAGE_KEY, cmd.reason().getDescription());
1✔
810
        }
811
        Http2Headers headers = Utils.convertServerHeaders(md);
1✔
812
        encoder().writeHeaders(
1✔
813
            ctx, cmd.stream().id(), headers, /* padding = */ 0, /* endStream = */ true, promise);
1✔
814
      } else {
1✔
815
        // Terminate the stream.
816
        encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
817
      }
818
    }
819
  }
1✔
820

821
  // Determine whether a CancelServerStreamCommand should try to close the stream with a
822
  // HEADERS or a RST_STREAM frame. The caller has some influence over this (they can
823
  // configure cmd.wantsHeaders()). The state of the stream also has an influence: we
824
  // only try to send HEADERS if the stream exists and hasn't already sent any headers.
825
  private static boolean shouldCloseStreamWithHeaders(
826
          CancelServerStreamCommand cmd, Http2Connection conn) {
827
    if (!cmd.wantsHeaders()) {
1✔
828
      return false;
1✔
829
    }
830
    Http2Stream stream = conn.stream(cmd.stream().id());
1✔
831
    return stream != null && !stream.isHeadersSent();
1✔
832
  }
833

834
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
835
      ChannelPromise promise) throws Exception {
836
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
837
    // requested here. But that's an edge case and seems bug-prone.
838
    if (gracefulShutdown == null) {
1✔
839
      Long graceTimeInNanos = null;
1✔
840
      if (msg.getGraceTimeUnit() != null) {
1✔
841
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
842
      }
843
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
844
      gracefulShutdown.start(ctx);
1✔
845
    }
846
    promise.setSuccess();
1✔
847
  }
1✔
848

849
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
850
      ChannelPromise promise) throws Exception {
851
    super.close(ctx, promise);
1✔
852
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
853
      @Override
854
      public boolean visit(Http2Stream stream) throws Http2Exception {
855
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
856
        if (serverStream != null) {
1✔
857
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
858
            PerfMark.attachTag(serverStream.tag());
1✔
859
            PerfMark.linkIn(msg.getLink());
1✔
860
            serverStream.transportReportStatus(msg.getStatus());
1✔
861
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
862
          }
863
        }
864
        stream.close();
1✔
865
        return true;
1✔
866
      }
867
    });
868
  }
1✔
869

870
  private void respondWithHttpError(
871
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
872
    Metadata metadata = new Metadata();
1✔
873
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
874
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
875
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
876

877
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
878
        .status("" + code)
1✔
879
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
880
    for (int i = 0; i < serialized.length; i += 2) {
1✔
881
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
882
    }
883
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
884
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
885
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
886
  }
1✔
887

888
  private Http2Stream requireHttp2Stream(int streamId) {
889
    Http2Stream stream = connection().stream(streamId);
1✔
890
    if (stream == null) {
1✔
891
      // This should never happen.
892
      throw new AssertionError("Stream does not exist: " + streamId);
×
893
    }
894
    return stream;
1✔
895
  }
896

897
  /**
898
   * Returns the server stream associated to the given HTTP/2 stream object.
899
   */
900
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
901
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
902
  }
903

904
  private Http2Exception newStreamException(int streamId, Throwable cause) {
905
    return Http2Exception.streamError(
×
906
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
×
907
  }
908

909
  private class FrameListener extends Http2FrameAdapter {
1✔
910
    private boolean firstSettings = true;
1✔
911

912
    @Override
913
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
914
      if (firstSettings) {
1✔
915
        firstSettings = false;
1✔
916
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
917
        // handshakeTimeout
918
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
919
      }
920
    }
1✔
921

922
    @Override
923
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
924
        boolean endOfStream) throws Http2Exception {
925
      if (keepAliveManager != null) {
1✔
926
        keepAliveManager.onDataReceived();
1✔
927
      }
928
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
929
      return padding;
1✔
930
    }
931

932
    @Override
933
    public void onHeadersRead(ChannelHandlerContext ctx,
934
        int streamId,
935
        Http2Headers headers,
936
        int streamDependency,
937
        short weight,
938
        boolean exclusive,
939
        int padding,
940
        boolean endStream) throws Http2Exception {
941
      if (keepAliveManager != null) {
1✔
942
        keepAliveManager.onDataReceived();
1✔
943
      }
944
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
945
      if (endStream) {
1✔
946
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
947
      }
948
    }
1✔
949

950
    @Override
951
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
952
        throws Http2Exception {
953
      if (keepAliveManager != null) {
1✔
954
        keepAliveManager.onDataReceived();
1✔
955
      }
956
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
957
    }
1✔
958

959
    @Override
960
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
961
      if (keepAliveManager != null) {
1✔
962
        keepAliveManager.onDataReceived();
1✔
963
      }
964
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
965
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
966
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
967
            debugData, ctx.newPromise());
1✔
968
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
969
        try {
970
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
971
        } catch (Exception ex) {
×
972
          onError(ctx, /* outbound= */ true, ex);
×
973
        }
1✔
974
      }
975
    }
1✔
976

977
    @Override
978
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
979
      if (keepAliveManager != null) {
1✔
980
        keepAliveManager.onDataReceived();
1✔
981
      }
982
      if (data == flowControlPing().payload()) {
1✔
983
        flowControlPing().updateWindow();
1✔
984
        logger.log(Level.FINE, "Window: {0}",
1✔
985
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
986
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
987
        if (gracefulShutdown == null) {
1✔
988
          // this should never happen
989
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
990
        } else {
991
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
992
        }
993
      } else if (data != KEEPALIVE_PING) {
1✔
994
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
995
      }
996
    }
1✔
997
  }
998

999
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1000
    final ChannelHandlerContext ctx;
1001

1002
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
1003
      this.ctx = ctx;
1✔
1004
    }
1✔
1005

1006
    @Override
1007
    public void ping() {
1008
      ChannelFuture pingFuture = encoder().writePing(
1✔
1009
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
1010
      ctx.flush();
1✔
1011
      pingFuture.addListener(new ChannelFutureListener() {
1✔
1012
        @Override
1013
        public void operationComplete(ChannelFuture future) throws Exception {
1014
          if (future.isSuccess()) {
1✔
1015
            transportTracer.reportKeepAliveSent();
1✔
1016
          }
1017
        }
1✔
1018
      });
1019
    }
1✔
1020

1021
    @Override
1022
    public void onPingTimeout() {
1023
      try {
1024
        forcefulClose(
1✔
1025
            ctx,
1026
            new ForcefulCloseCommand(Status.UNAVAILABLE
1027
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
1028
            ctx.newPromise());
1✔
1029
      } catch (Exception ex) {
×
1030
        try {
1031
          exceptionCaught(ctx, ex);
×
1032
        } catch (Exception ex2) {
×
1033
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
1034
          logger.log(Level.WARNING, "Original failure", ex);
×
1035
        }
×
1036
      }
1✔
1037
    }
1✔
1038
  }
1039

1040
  private final class GracefulShutdown {
1041
    String goAwayMessage;
1042

1043
    /**
1044
     * The grace time between starting graceful shutdown and closing the netty channel,
1045
     * {@code null} is unspecified.
1046
     */
1047
    @CheckForNull
1048
    Long graceTimeInNanos;
1049

1050
    /**
1051
     * True if ping is Acked or ping is timeout.
1052
     */
1053
    boolean pingAckedOrTimeout;
1054

1055
    Future<?> pingFuture;
1056

1057
    GracefulShutdown(String goAwayMessage,
1058
        @Nullable Long graceTimeInNanos) {
1✔
1059
      this.goAwayMessage = goAwayMessage;
1✔
1060
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1061
    }
1✔
1062

1063
    /**
1064
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1065
     */
1066
    void start(final ChannelHandlerContext ctx) {
1067
      goAway(
1✔
1068
          ctx,
1069
          Integer.MAX_VALUE,
1070
          Http2Error.NO_ERROR.code(),
1✔
1071
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1072
          ctx.newPromise());
1✔
1073

1074
      pingFuture = ctx.executor().schedule(
1✔
1075
          new Runnable() {
1✔
1076
            @Override
1077
            public void run() {
1078
              secondGoAwayAndClose(ctx);
1✔
1079
            }
1✔
1080
          },
1081
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1082
          TimeUnit.NANOSECONDS);
1083

1084
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1085
    }
1✔
1086

1087
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1088
      if (pingAckedOrTimeout) {
1✔
1089
        return;
×
1090
      }
1091
      pingAckedOrTimeout = true;
1✔
1092

1093
      checkNotNull(pingFuture, "pingFuture");
1✔
1094
      pingFuture.cancel(false);
1✔
1095

1096
      // send the second GOAWAY with last stream id
1097
      goAway(
1✔
1098
          ctx,
1099
          connection().remote().lastStreamCreated(),
1✔
1100
          Http2Error.NO_ERROR.code(),
1✔
1101
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1102
          ctx.newPromise());
1✔
1103

1104
      // gracefully shutdown with specified grace time
1105
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1106
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1107
      try {
1108
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1109
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1110
      } catch (Exception e) {
×
1111
        onError(ctx, /* outbound= */ true, e);
×
1112
      } finally {
1113
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1114
      }
1115
    }
1✔
1116

1117
    private long graceTimeOverrideMillis(long originalMillis) {
1118
      if (graceTimeInNanos == null) {
1✔
1119
        return originalMillis;
1✔
1120
      }
1121
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1122
        // netty treats -1 as "no timeout"
1123
        return -1L;
1✔
1124
      }
1125
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1126
    }
1127
  }
1128

1129
  // Use a frame writer so that we know when frames are through flow control and actually being
1130
  // written.
1131
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1132
    private final KeepAliveEnforcer keepAliveEnforcer;
1133

1134
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1135
        KeepAliveEnforcer keepAliveEnforcer) {
1136
      super(delegate);
1✔
1137
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1138
    }
1✔
1139

1140
    @Override
1141
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1142
        int padding, boolean endStream, ChannelPromise promise) {
1143
      keepAliveEnforcer.resetCounters();
1✔
1144
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1145
    }
1146

1147
    @Override
1148
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1149
        int padding, boolean endStream, ChannelPromise promise) {
1150
      keepAliveEnforcer.resetCounters();
1✔
1151
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1152
    }
1153

1154
    @Override
1155
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1156
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1157
        ChannelPromise promise) {
1158
      keepAliveEnforcer.resetCounters();
×
1159
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1160
          padding, endStream, promise);
1161
    }
1162
  }
1163

1164
  private static final class Http2RstCounterEncoder extends DecoratingHttp2ConnectionEncoder {
1165
    private final RstStreamCounter rstStreamCounter;
1166
    private Http2LifecycleManager lifecycleManager;
1167

1168
    Http2RstCounterEncoder(Http2ConnectionEncoder encoder, RstStreamCounter rstStreamCounter) {
1169
      super(encoder);
1✔
1170
      this.rstStreamCounter = rstStreamCounter;
1✔
1171
    }
1✔
1172

1173
    @Override
1174
    public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
1175
      this.lifecycleManager = lifecycleManager;
1✔
1176
      super.lifecycleManager(lifecycleManager);
1✔
1177
    }
1✔
1178

1179
    @Override
1180
    public ChannelFuture writeRstStream(
1181
        ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) {
1182
      ChannelFuture future = super.writeRstStream(ctx, streamId, errorCode, promise);
1✔
1183
      // We want to count "induced" RST_STREAM, where the server sent a reset because of a malformed
1184
      // frame.
1185
      boolean normalRst
1✔
1186
          = errorCode == Http2Error.NO_ERROR.code() || errorCode == Http2Error.CANCEL.code();
1✔
1187
      if (!normalRst) {
1✔
1188
        Http2Exception tooManyRstStream = rstStreamCounter.countRstStream();
1✔
1189
        if (tooManyRstStream != null) {
1✔
1190
          lifecycleManager.onError(ctx, true, tooManyRstStream);
1✔
1191
          ctx.close();
1✔
1192
        }
1193
      }
1194
      return future;
1✔
1195
    }
1196
  }
1197

1198
  private static final class RstStreamCounter {
1199
    private final int maxRstCount;
1200
    private final long maxRstPeriodNanos;
1201
    private final Ticker ticker;
1202
    private int rstCount;
1203
    private long lastRstNanoTime;
1204

1205
    RstStreamCounter(int maxRstCount, long maxRstPeriodNanos, Ticker ticker) {
1✔
1206
      checkArgument(maxRstCount >= 0, "maxRstCount must be non-negative: %s", maxRstCount);
1✔
1207
      this.maxRstCount = maxRstCount;
1✔
1208
      this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
1209
      this.ticker = checkNotNull(ticker, "ticker");
1✔
1210
      this.lastRstNanoTime = ticker.read();
1✔
1211
    }
1✔
1212

1213
    /** Returns non-{@code null} when the connection should be killed by the caller. */
1214
    private Http2Exception countRstStream() {
1215
      if (maxRstCount == 0) {
1✔
1216
        return null;
1✔
1217
      }
1218
      long now = ticker.read();
1✔
1219
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
1220
        lastRstNanoTime = now;
1✔
1221
        rstCount = 1;
1✔
1222
      } else {
1223
        rstCount++;
1✔
1224
        if (rstCount > maxRstCount) {
1✔
1225
          return new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
1226
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
1227
            @Override
1228
            public Throwable fillInStackTrace() {
1229
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
1230
              return this;
1✔
1231
            }
1232
          };
1233
        }
1234
      }
1235
      return null;
1✔
1236
    }
1237
  }
1238

1239
  private static class ServerChannelLogger extends ChannelLogger {
1240
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1241

1242
    @Override
1243
    public void log(ChannelLogLevel level, String message) {
1244
      log.log(toJavaLogLevel(level), message);
1✔
1245
    }
1✔
1246

1247
    @Override
1248
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1249
      log(level, MessageFormat.format(messageFormat, args));
1✔
1250
    }
1✔
1251
  }
1252

1253
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1254
    switch (level) {
1✔
1255
      case ERROR:
1256
        return Level.FINE;
×
1257
      case WARNING:
1258
        return Level.FINER;
×
1259
      default:
1260
        return Level.FINEST;
1✔
1261
    }
1262
  }
1263
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc