• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18910

20 Nov 2023 10:05PM UTC coverage: 88.226% (+0.03%) from 88.201%
#18910

push

github

ejona86
util: Remove shutdown subchannels from OD tracking (#10683)

An OutlierDetectionLoadBalancer child load balancer might decided to
shut down any subchannel it is tracking. We need to make sure that those
subchannels are removed from the outlier detection tracker map to avoid
a memory leak.

30370 of 34423 relevant lines covered (88.23%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.7
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
64
import io.netty.handler.codec.http2.DefaultHttp2Connection;
65
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
66
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
67
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
68
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
69
import io.netty.handler.codec.http2.DefaultHttp2Headers;
70
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
71
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
72
import io.netty.handler.codec.http2.Http2Connection;
73
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
74
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
75
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
76
import io.netty.handler.codec.http2.Http2Error;
77
import io.netty.handler.codec.http2.Http2Exception;
78
import io.netty.handler.codec.http2.Http2Exception.StreamException;
79
import io.netty.handler.codec.http2.Http2FrameAdapter;
80
import io.netty.handler.codec.http2.Http2FrameLogger;
81
import io.netty.handler.codec.http2.Http2FrameReader;
82
import io.netty.handler.codec.http2.Http2FrameWriter;
83
import io.netty.handler.codec.http2.Http2Headers;
84
import io.netty.handler.codec.http2.Http2HeadersDecoder;
85
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
86
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
87
import io.netty.handler.codec.http2.Http2Settings;
88
import io.netty.handler.codec.http2.Http2Stream;
89
import io.netty.handler.codec.http2.Http2StreamVisitor;
90
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
91
import io.netty.handler.logging.LogLevel;
92
import io.netty.util.AsciiString;
93
import io.netty.util.ReferenceCountUtil;
94
import io.perfmark.PerfMark;
95
import io.perfmark.Tag;
96
import io.perfmark.TaskCloseable;
97
import java.text.MessageFormat;
98
import java.util.List;
99
import java.util.concurrent.Future;
100
import java.util.concurrent.ScheduledFuture;
101
import java.util.concurrent.TimeUnit;
102
import java.util.logging.Level;
103
import java.util.logging.Logger;
104
import javax.annotation.CheckForNull;
105
import javax.annotation.Nullable;
106

107
/**
108
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
109
 * the context of the Netty Channel thread.
110
 */
111
class NettyServerHandler extends AbstractNettyHandler {
112
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
113
  private static final long KEEPALIVE_PING = 0xDEADL;
114
  @VisibleForTesting
115
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
116
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
117
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
118
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
119
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
120

121
  private final Http2Connection.PropertyKey streamKey;
122
  private final ServerTransportListener transportListener;
123
  private final int maxMessageSize;
124
  private final long keepAliveTimeInNanos;
125
  private final long keepAliveTimeoutInNanos;
126
  private final long maxConnectionAgeInNanos;
127
  private final long maxConnectionAgeGraceInNanos;
128
  private final int maxRstCount;
129
  private final long maxRstPeriodNanos;
130
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
131
  private final TransportTracer transportTracer;
132
  private final KeepAliveEnforcer keepAliveEnforcer;
133
  private final Attributes eagAttributes;
134
  private final Ticker ticker;
135
  /** Incomplete attributes produced by negotiator. */
136
  private Attributes negotiationAttributes;
137
  private InternalChannelz.Security securityInfo;
138
  /** Completed attributes produced by transportReady. */
139
  private Attributes attributes;
140
  private Throwable connectionError;
141
  private boolean teWarningLogged;
142
  private WriteQueue serverWriteQueue;
143
  private AsciiString lastKnownAuthority;
144
  @CheckForNull
145
  private KeepAliveManager keepAliveManager;
146
  @CheckForNull
147
  private MaxConnectionIdleManager maxConnectionIdleManager;
148
  @CheckForNull
149
  private ScheduledFuture<?> maxConnectionAgeMonitor;
150
  @CheckForNull
151
  private GracefulShutdown gracefulShutdown;
152
  private int rstCount;
153
  private long lastRstNanoTime;
154

155

156
  static NettyServerHandler newHandler(
157
      ServerTransportListener transportListener,
158
      ChannelPromise channelUnused,
159
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
160
      TransportTracer transportTracer,
161
      int maxStreams,
162
      boolean autoFlowControl,
163
      int flowControlWindow,
164
      int maxHeaderListSize,
165
      int maxMessageSize,
166
      long keepAliveTimeInNanos,
167
      long keepAliveTimeoutInNanos,
168
      long maxConnectionIdleInNanos,
169
      long maxConnectionAgeInNanos,
170
      long maxConnectionAgeGraceInNanos,
171
      boolean permitKeepAliveWithoutCalls,
172
      long permitKeepAliveTimeInNanos,
173
      int maxRstCount,
174
      long maxRstPeriodNanos,
175
      Attributes eagAttributes) {
176
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
177
        maxHeaderListSize);
178
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
179
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
180
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
181
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
182
    Http2FrameWriter frameWriter =
1✔
183
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
184
    return newHandler(
1✔
185
        channelUnused,
186
        frameReader,
187
        frameWriter,
188
        transportListener,
189
        streamTracerFactories,
190
        transportTracer,
191
        maxStreams,
192
        autoFlowControl,
193
        flowControlWindow,
194
        maxHeaderListSize,
195
        maxMessageSize,
196
        keepAliveTimeInNanos,
197
        keepAliveTimeoutInNanos,
198
        maxConnectionIdleInNanos,
199
        maxConnectionAgeInNanos,
200
        maxConnectionAgeGraceInNanos,
201
        permitKeepAliveWithoutCalls,
202
        permitKeepAliveTimeInNanos,
203
        maxRstCount,
204
        maxRstPeriodNanos,
205
        eagAttributes,
206
        Ticker.systemTicker());
1✔
207
  }
208

209
  static NettyServerHandler newHandler(
210
      ChannelPromise channelUnused,
211
      Http2FrameReader frameReader,
212
      Http2FrameWriter frameWriter,
213
      ServerTransportListener transportListener,
214
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
215
      TransportTracer transportTracer,
216
      int maxStreams,
217
      boolean autoFlowControl,
218
      int flowControlWindow,
219
      int maxHeaderListSize,
220
      int maxMessageSize,
221
      long keepAliveTimeInNanos,
222
      long keepAliveTimeoutInNanos,
223
      long maxConnectionIdleInNanos,
224
      long maxConnectionAgeInNanos,
225
      long maxConnectionAgeGraceInNanos,
226
      boolean permitKeepAliveWithoutCalls,
227
      long permitKeepAliveTimeInNanos,
228
      int maxRstCount,
229
      long maxRstPeriodNanos,
230
      Attributes eagAttributes,
231
      Ticker ticker) {
232
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
233
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
234
        flowControlWindow);
235
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
236
        maxHeaderListSize);
237
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
238
        maxMessageSize);
239

240
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
241
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
242
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
243
    DefaultHttp2RemoteFlowController controller =
1✔
244
        new DefaultHttp2RemoteFlowController(connection, dist);
245
    connection.remote().flowController(controller);
1✔
246
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
247
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
248

249
    // Create the local flow controller configured to auto-refill the connection window.
250
    connection.local().flowController(
1✔
251
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
252
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
253
    Http2ConnectionEncoder encoder =
1✔
254
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
255
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
256
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
257
        frameReader);
258

259
    Http2Settings settings = new Http2Settings();
1✔
260
    settings.initialWindowSize(flowControlWindow);
1✔
261
    settings.maxConcurrentStreams(maxStreams);
1✔
262
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
263

264
    if (ticker == null) {
1✔
265
      ticker = Ticker.systemTicker();
×
266
    }
267

268
    return new NettyServerHandler(
1✔
269
        channelUnused,
270
        connection,
271
        transportListener,
272
        streamTracerFactories,
273
        transportTracer,
274
        decoder, encoder, settings,
275
        maxMessageSize,
276
        keepAliveTimeInNanos, keepAliveTimeoutInNanos,
277
        maxConnectionIdleInNanos,
278
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
279
        keepAliveEnforcer,
280
        autoFlowControl,
281
        maxRstCount,
282
        maxRstPeriodNanos,
283
        eagAttributes, ticker);
284
  }
285

286
  private NettyServerHandler(
287
      ChannelPromise channelUnused,
288
      final Http2Connection connection,
289
      ServerTransportListener transportListener,
290
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
291
      TransportTracer transportTracer,
292
      Http2ConnectionDecoder decoder,
293
      Http2ConnectionEncoder encoder,
294
      Http2Settings settings,
295
      int maxMessageSize,
296
      long keepAliveTimeInNanos,
297
      long keepAliveTimeoutInNanos,
298
      long maxConnectionIdleInNanos,
299
      long maxConnectionAgeInNanos,
300
      long maxConnectionAgeGraceInNanos,
301
      final KeepAliveEnforcer keepAliveEnforcer,
302
      boolean autoFlowControl,
303
      int maxRstCount,
304
      long maxRstPeriodNanos,
305
      Attributes eagAttributes,
306
      Ticker ticker) {
307
    super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
1✔
308
        autoFlowControl, null, ticker);
309

310
    final MaxConnectionIdleManager maxConnectionIdleManager;
311
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
312
      maxConnectionIdleManager = null;
1✔
313
    } else {
314
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
315
    }
316

317
    connection.addListener(new Http2ConnectionAdapter() {
1✔
318
      @Override
319
      public void onStreamActive(Http2Stream stream) {
320
        if (connection.numActiveStreams() == 1) {
1✔
321
          keepAliveEnforcer.onTransportActive();
1✔
322
          if (maxConnectionIdleManager != null) {
1✔
323
            maxConnectionIdleManager.onTransportActive();
1✔
324
          }
325
        }
326
      }
1✔
327

328
      @Override
329
      public void onStreamClosed(Http2Stream stream) {
330
        if (connection.numActiveStreams() == 0) {
1✔
331
          keepAliveEnforcer.onTransportIdle();
1✔
332
          if (maxConnectionIdleManager != null) {
1✔
333
            maxConnectionIdleManager.onTransportIdle();
1✔
334
          }
335
        }
336
      }
1✔
337
    });
338

339
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
340
    this.maxMessageSize = maxMessageSize;
1✔
341
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
342
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
343
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
344
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
345
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
346
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
347
    this.maxRstCount = maxRstCount;
1✔
348
    this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
349
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
350
    this.ticker = checkNotNull(ticker, "ticker");
1✔
351

352
    this.lastRstNanoTime = ticker.read();
1✔
353
    streamKey = encoder.connection().newKey();
1✔
354
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
355
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
356
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
357
    // Set the frame listener on the decoder.
358
    decoder().frameListener(new FrameListener());
1✔
359
  }
1✔
360

361
  @Nullable
362
  Throwable connectionError() {
363
    return connectionError;
1✔
364
  }
365

366
  @Override
367
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
368
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
369

370
    // init max connection age monitor
371
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
372
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
373
          new LogExceptionRunnable(new Runnable() {
1✔
374
            @Override
375
            public void run() {
376
              if (gracefulShutdown == null) {
1✔
377
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
378
                gracefulShutdown.start(ctx);
1✔
379
                ctx.flush();
1✔
380
              }
381
            }
1✔
382
          }),
383
          maxConnectionAgeInNanos,
384
          TimeUnit.NANOSECONDS);
385
    }
386

387
    if (maxConnectionIdleManager != null) {
1✔
388
      maxConnectionIdleManager.start(new Runnable() {
1✔
389
        @Override
390
        public void run() {
391
          if (gracefulShutdown == null) {
1✔
392
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
393
            gracefulShutdown.start(ctx);
1✔
394
            ctx.flush();
1✔
395
          }
396
        }
1✔
397
      }, ctx.executor());
1✔
398
    }
399

400
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
401
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
402
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
403
      keepAliveManager.onTransportStarted();
1✔
404
    }
405

406
    assert encoder().connection().equals(decoder().connection());
1✔
407
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
408

409
    super.handlerAdded(ctx);
1✔
410
  }
1✔
411

412
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
413
      throws Http2Exception {
414
    try {
415
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
416
      // by Netty. RFC 7540 section 8.1.2.2
417
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
418
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
419
        return;
×
420
      }
421

422
      if (headers.authority() == null) {
1✔
423
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
424
        if (hosts.size() > 1) {
1✔
425
          // RFC 7230 section 5.4
426
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
427
              "Multiple host headers");
428
          return;
1✔
429
        }
430
        if (!hosts.isEmpty()) {
1✔
431
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
432
        }
433
      }
434
      headers.remove(HOST);
1✔
435

436
      // Remove the leading slash of the path and get the fully qualified method name
437
      CharSequence path = headers.path();
1✔
438

439
      if (path == null) {
1✔
440
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
441
            "Expected path but is missing");
442
        return;
1✔
443
      }
444

445
      if (path.charAt(0) != '/') {
1✔
446
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
447
            String.format("Expected path to start with /: %s", path));
1✔
448
        return;
1✔
449
      }
450

451
      String method = path.subSequence(1, path.length()).toString();
1✔
452

453
      // Verify that the Content-Type is correct in the request.
454
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
455
      if (contentType == null) {
1✔
456
        respondWithHttpError(
1✔
457
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
458
        return;
1✔
459
      }
460
      String contentTypeString = contentType.toString();
1✔
461
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
462
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
463
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
464
        return;
1✔
465
      }
466

467
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
468
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
469
            String.format("Method '%s' is not supported", headers.method()));
1✔
470
        return;
1✔
471
      }
472

473
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
474
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
475
                + "some intermediate proxy may not support trailers",
476
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
477
        teWarningLogged = true;
1✔
478
      }
479

480
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
481
      // method.
482
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
483

484
      Metadata metadata = Utils.convertHeaders(headers);
1✔
485
      StatsTraceContext statsTraceCtx =
1✔
486
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
487

488
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
489
          this,
490
          ctx.channel().eventLoop(),
1✔
491
          http2Stream,
492
          maxMessageSize,
493
          statsTraceCtx,
494
          transportTracer,
495
          method);
496

497
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
498
        PerfMark.attachTag(state.tag());
1✔
499
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
500
        NettyServerStream stream = new NettyServerStream(
1✔
501
            ctx.channel(),
1✔
502
            state,
503
            attributes,
504
            authority,
505
            statsTraceCtx,
506
            transportTracer);
507
        transportListener.streamCreated(stream, method, metadata);
1✔
508
        state.onStreamAllocated();
1✔
509
        http2Stream.setProperty(streamKey, state);
1✔
510
      }
511
    } catch (Exception e) {
×
512
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
513
      // Throw an exception that will get handled by onStreamError.
514
      throw newStreamException(streamId, e);
×
515
    }
1✔
516
  }
1✔
517

518
  private String getOrUpdateAuthority(AsciiString authority) {
519
    if (authority == null) {
1✔
520
      return null;
1✔
521
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
522
      lastKnownAuthority = authority;
1✔
523
    }
524

525
    // AsciiString.toString() is internally cached, so subsequent calls will not
526
    // result in recomputing the String representation of lastKnownAuthority.
527
    return lastKnownAuthority.toString();
1✔
528
  }
529

530
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
531
      throws Http2Exception {
532
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
533
    try {
534
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
535
      if (stream == null) {
1✔
536
        return;
1✔
537
      }
538
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
539
        PerfMark.attachTag(stream.tag());
1✔
540
        stream.inboundDataReceived(data, endOfStream);
1✔
541
      }
542
    } catch (Throwable e) {
×
543
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
544
      // Throw an exception that will get handled by onStreamError.
545
      throw newStreamException(streamId, e);
×
546
    }
1✔
547
  }
1✔
548

549
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
550
    if (maxRstCount > 0) {
1✔
551
      long now = ticker.read();
1✔
552
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
553
        lastRstNanoTime = now;
1✔
554
        rstCount = 1;
1✔
555
      } else {
556
        rstCount++;
1✔
557
        if (rstCount > maxRstCount) {
1✔
558
          throw new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
559
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
560
            @Override
561
            public Throwable fillInStackTrace() {
562
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
563
              return this;
1✔
564
            }
565
          };
566
        }
567
      }
568
    }
569

570
    try {
571
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
572
      if (stream != null) {
1✔
573
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
574
          PerfMark.attachTag(stream.tag());
1✔
575
          stream.transportReportStatus(
1✔
576
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
577
        }
578
      }
579
    } catch (Throwable e) {
1✔
580
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
1✔
581
      // Throw an exception that will get handled by onStreamError.
582
      throw newStreamException(streamId, e);
1✔
583
    }
1✔
584
  }
1✔
585

586
  @Override
587
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
588
      Http2Exception http2Ex) {
589
    logger.log(Level.FINE, "Connection Error", cause);
1✔
590
    connectionError = cause;
1✔
591
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
592
  }
1✔
593

594
  @Override
595
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
596
      StreamException http2Ex) {
597
    NettyServerStream.TransportState serverStream = serverStream(
1✔
598
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
599
    Level level = Level.WARNING;
1✔
600
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
601
      level = Level.FINE;
×
602
    }
603
    logger.log(level, "Stream Error", cause);
1✔
604
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
605
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
606
      PerfMark.attachTag(tag);
1✔
607
      if (serverStream != null) {
1✔
608
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
609
      }
610
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
611
      // Delegate to the base class to send a RST_STREAM.
612
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
613
    }
614
  }
1✔
615

616
  @Override
617
  public void handleProtocolNegotiationCompleted(
618
      Attributes attrs, InternalChannelz.Security securityInfo) {
619
    negotiationAttributes = attrs;
1✔
620
    this.securityInfo = securityInfo;
1✔
621
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
622
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
623
  }
1✔
624

625
  @Override
626
  public Attributes getEagAttributes() {
627
    return eagAttributes;
1✔
628
  }
629

630
  InternalChannelz.Security getSecurityInfo() {
631
    return securityInfo;
1✔
632
  }
633

634
  @VisibleForTesting
635
  KeepAliveManager getKeepAliveManagerForTest() {
636
    return keepAliveManager;
1✔
637
  }
638

639
  @VisibleForTesting
640
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
641
    this.keepAliveManager = keepAliveManager;
1✔
642
  }
1✔
643

644
  /**
645
   * Handler for the Channel shutting down.
646
   */
647
  @Override
648
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
649
    try {
650
      if (keepAliveManager != null) {
1✔
651
        keepAliveManager.onTransportTermination();
1✔
652
      }
653
      if (maxConnectionIdleManager != null) {
1✔
654
        maxConnectionIdleManager.onTransportTermination();
1✔
655
      }
656
      if (maxConnectionAgeMonitor != null) {
1✔
657
        maxConnectionAgeMonitor.cancel(false);
1✔
658
      }
659
      final Status status =
1✔
660
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
661
      // Any streams that are still active must be closed
662
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
663
        @Override
664
        public boolean visit(Http2Stream stream) throws Http2Exception {
665
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
666
          if (serverStream != null) {
1✔
667
            serverStream.transportReportStatus(status);
1✔
668
          }
669
          return true;
1✔
670
        }
671
      });
672
    } finally {
673
      super.channelInactive(ctx);
1✔
674
    }
675
  }
1✔
676

677
  WriteQueue getWriteQueue() {
678
    return serverWriteQueue;
1✔
679
  }
680

681
  /**
682
   * Handler for commands sent from the stream.
683
   */
684
  @Override
685
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
686
      throws Exception {
687
    if (msg instanceof SendGrpcFrameCommand) {
1✔
688
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
689
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
690
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
691
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
692
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
693
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
694
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
695
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
696
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
697
    } else {
698
      AssertionError e =
×
699
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
700
      ReferenceCountUtil.release(msg);
×
701
      promise.setFailure(e);
×
702
      throw e;
×
703
    }
704
  }
1✔
705

706
  @Override
707
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
708
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
709
    ctx.flush();
1✔
710
  }
1✔
711

712
  /**
713
   * Returns the given processed bytes back to inbound flow control.
714
   */
715
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
716
    try {
717
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
718
    } catch (Http2Exception e) {
×
719
      throw new RuntimeException(e);
×
720
    }
1✔
721
  }
1✔
722

723
  private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception {
724
    final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
725
    if (stream != null) {
1✔
726
      promise.addListener(new ChannelFutureListener() {
1✔
727
        @Override
728
        public void operationComplete(ChannelFuture future) {
729
          stream.complete();
1✔
730
        }
1✔
731
      });
732
    }
733
  }
1✔
734

735
  /**
736
   * Sends the given gRPC frame to the client.
737
   */
738
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
739
      ChannelPromise promise) throws Http2Exception {
740
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
741
      PerfMark.attachTag(cmd.stream().tag());
1✔
742
      PerfMark.linkIn(cmd.getLink());
1✔
743
      if (cmd.endStream()) {
1✔
744
        closeStreamWhenDone(promise, cmd.stream().id());
×
745
      }
746
      // Call the base class to write the HTTP/2 DATA frame.
747
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
748
    }
749
  }
1✔
750

751
  /**
752
   * Sends the response headers to the client.
753
   */
754
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
755
      ChannelPromise promise) throws Http2Exception {
756
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
757
      PerfMark.attachTag(cmd.stream().tag());
1✔
758
      PerfMark.linkIn(cmd.getLink());
1✔
759
      // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296
760
      // is fixed.
761
      int streamId = cmd.stream().id();
1✔
762
      Http2Stream stream = connection().stream(streamId);
1✔
763
      if (stream == null) {
1✔
764
        resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
1✔
765
        return;
1✔
766
      }
767
      if (cmd.endOfStream()) {
1✔
768
        closeStreamWhenDone(promise, streamId);
1✔
769
      }
770
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
771
    }
772
  }
1✔
773

774
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
775
      ChannelPromise promise) {
776
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
777
      PerfMark.attachTag(cmd.stream().tag());
1✔
778
      PerfMark.linkIn(cmd.getLink());
1✔
779
      // Notify the listener if we haven't already.
780
      cmd.stream().transportReportStatus(cmd.reason());
1✔
781
      // Terminate the stream.
782
      encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
783
    }
784
  }
1✔
785

786
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
787
      ChannelPromise promise) throws Exception {
788
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
789
    // requested here. But that's an edge case and seems bug-prone.
790
    if (gracefulShutdown == null) {
1✔
791
      Long graceTimeInNanos = null;
1✔
792
      if (msg.getGraceTimeUnit() != null) {
1✔
793
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
794
      }
795
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
796
      gracefulShutdown.start(ctx);
1✔
797
    }
798
    promise.setSuccess();
1✔
799
  }
1✔
800

801
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
802
      ChannelPromise promise) throws Exception {
803
    super.close(ctx, promise);
1✔
804
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
805
      @Override
806
      public boolean visit(Http2Stream stream) throws Http2Exception {
807
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
808
        if (serverStream != null) {
1✔
809
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
810
            PerfMark.attachTag(serverStream.tag());
1✔
811
            PerfMark.linkIn(msg.getLink());
1✔
812
            serverStream.transportReportStatus(msg.getStatus());
1✔
813
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
814
          }
815
        }
816
        stream.close();
1✔
817
        return true;
1✔
818
      }
819
    });
820
  }
1✔
821

822
  private void respondWithHttpError(
823
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
824
    Metadata metadata = new Metadata();
1✔
825
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
826
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
827
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
828

829
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
830
        .status("" + code)
1✔
831
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
832
    for (int i = 0; i < serialized.length; i += 2) {
1✔
833
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
834
    }
835
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
836
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
837
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
838
  }
1✔
839

840
  private Http2Stream requireHttp2Stream(int streamId) {
841
    Http2Stream stream = connection().stream(streamId);
1✔
842
    if (stream == null) {
1✔
843
      // This should never happen.
844
      throw new AssertionError("Stream does not exist: " + streamId);
×
845
    }
846
    return stream;
1✔
847
  }
848

849
  /**
850
   * Returns the server stream associated to the given HTTP/2 stream object.
851
   */
852
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
853
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
854
  }
855

856
  private Http2Exception newStreamException(int streamId, Throwable cause) {
857
    return Http2Exception.streamError(
1✔
858
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
1✔
859
  }
860

861
  private class FrameListener extends Http2FrameAdapter {
1✔
862
    private boolean firstSettings = true;
1✔
863

864
    @Override
865
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
866
      if (firstSettings) {
1✔
867
        firstSettings = false;
1✔
868
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
869
        // handshakeTimeout
870
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
871
      }
872
    }
1✔
873

874
    @Override
875
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
876
        boolean endOfStream) throws Http2Exception {
877
      if (keepAliveManager != null) {
1✔
878
        keepAliveManager.onDataReceived();
1✔
879
      }
880
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
881
      return padding;
1✔
882
    }
883

884
    @Override
885
    public void onHeadersRead(ChannelHandlerContext ctx,
886
        int streamId,
887
        Http2Headers headers,
888
        int streamDependency,
889
        short weight,
890
        boolean exclusive,
891
        int padding,
892
        boolean endStream) throws Http2Exception {
893
      if (keepAliveManager != null) {
1✔
894
        keepAliveManager.onDataReceived();
1✔
895
      }
896
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
897
      if (endStream) {
1✔
898
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
899
      }
900
    }
1✔
901

902
    @Override
903
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
904
        throws Http2Exception {
905
      if (keepAliveManager != null) {
1✔
906
        keepAliveManager.onDataReceived();
1✔
907
      }
908
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
909
    }
1✔
910

911
    @Override
912
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
913
      if (keepAliveManager != null) {
1✔
914
        keepAliveManager.onDataReceived();
1✔
915
      }
916
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
917
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
918
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
919
            debugData, ctx.newPromise());
1✔
920
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
921
        try {
922
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
923
        } catch (Exception ex) {
×
924
          onError(ctx, /* outbound= */ true, ex);
×
925
        }
1✔
926
      }
927
    }
1✔
928

929
    @Override
930
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
931
      if (keepAliveManager != null) {
1✔
932
        keepAliveManager.onDataReceived();
1✔
933
      }
934
      if (data == flowControlPing().payload()) {
1✔
935
        flowControlPing().updateWindow();
1✔
936
        logger.log(Level.FINE, "Window: {0}",
1✔
937
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
938
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
939
        if (gracefulShutdown == null) {
1✔
940
          // this should never happen
941
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
942
        } else {
943
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
944
        }
945
      } else if (data != KEEPALIVE_PING) {
1✔
946
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
947
      }
948
    }
1✔
949
  }
950

951
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
952
    final ChannelHandlerContext ctx;
953

954
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
955
      this.ctx = ctx;
1✔
956
    }
1✔
957

958
    @Override
959
    public void ping() {
960
      ChannelFuture pingFuture = encoder().writePing(
1✔
961
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
962
      ctx.flush();
1✔
963
      pingFuture.addListener(new ChannelFutureListener() {
1✔
964
        @Override
965
        public void operationComplete(ChannelFuture future) throws Exception {
966
          if (future.isSuccess()) {
1✔
967
            transportTracer.reportKeepAliveSent();
1✔
968
          }
969
        }
1✔
970
      });
971
    }
1✔
972

973
    @Override
974
    public void onPingTimeout() {
975
      try {
976
        forcefulClose(
1✔
977
            ctx,
978
            new ForcefulCloseCommand(Status.UNAVAILABLE
979
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
980
            ctx.newPromise());
1✔
981
      } catch (Exception ex) {
×
982
        try {
983
          exceptionCaught(ctx, ex);
×
984
        } catch (Exception ex2) {
×
985
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
986
          logger.log(Level.WARNING, "Original failure", ex);
×
987
        }
×
988
      }
1✔
989
    }
1✔
990
  }
991

992
  private final class GracefulShutdown {
993
    String goAwayMessage;
994

995
    /**
996
     * The grace time between starting graceful shutdown and closing the netty channel,
997
     * {@code null} is unspecified.
998
     */
999
    @CheckForNull
1000
    Long graceTimeInNanos;
1001

1002
    /**
1003
     * True if ping is Acked or ping is timeout.
1004
     */
1005
    boolean pingAckedOrTimeout;
1006

1007
    Future<?> pingFuture;
1008

1009
    GracefulShutdown(String goAwayMessage,
1010
        @Nullable Long graceTimeInNanos) {
1✔
1011
      this.goAwayMessage = goAwayMessage;
1✔
1012
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1013
    }
1✔
1014

1015
    /**
1016
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1017
     */
1018
    void start(final ChannelHandlerContext ctx) {
1019
      goAway(
1✔
1020
          ctx,
1021
          Integer.MAX_VALUE,
1022
          Http2Error.NO_ERROR.code(),
1✔
1023
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1024
          ctx.newPromise());
1✔
1025

1026
      pingFuture = ctx.executor().schedule(
1✔
1027
          new Runnable() {
1✔
1028
            @Override
1029
            public void run() {
1030
              secondGoAwayAndClose(ctx);
1✔
1031
            }
1✔
1032
          },
1033
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1034
          TimeUnit.NANOSECONDS);
1035

1036
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1037
    }
1✔
1038

1039
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1040
      if (pingAckedOrTimeout) {
1✔
1041
        return;
×
1042
      }
1043
      pingAckedOrTimeout = true;
1✔
1044

1045
      checkNotNull(pingFuture, "pingFuture");
1✔
1046
      pingFuture.cancel(false);
1✔
1047

1048
      // send the second GOAWAY with last stream id
1049
      goAway(
1✔
1050
          ctx,
1051
          connection().remote().lastStreamCreated(),
1✔
1052
          Http2Error.NO_ERROR.code(),
1✔
1053
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1054
          ctx.newPromise());
1✔
1055

1056
      // gracefully shutdown with specified grace time
1057
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1058
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1059
      try {
1060
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1061
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1062
      } catch (Exception e) {
×
1063
        onError(ctx, /* outbound= */ true, e);
×
1064
      } finally {
1065
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1066
      }
1067
    }
1✔
1068

1069
    private long graceTimeOverrideMillis(long originalMillis) {
1070
      if (graceTimeInNanos == null) {
1✔
1071
        return originalMillis;
1✔
1072
      }
1073
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1074
        // netty treats -1 as "no timeout"
1075
        return -1L;
1✔
1076
      }
1077
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1078
    }
1079
  }
1080

1081
  // Use a frame writer so that we know when frames are through flow control and actually being
1082
  // written.
1083
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1084
    private final KeepAliveEnforcer keepAliveEnforcer;
1085

1086
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1087
        KeepAliveEnforcer keepAliveEnforcer) {
1088
      super(delegate);
1✔
1089
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1090
    }
1✔
1091

1092
    @Override
1093
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1094
        int padding, boolean endStream, ChannelPromise promise) {
1095
      keepAliveEnforcer.resetCounters();
1✔
1096
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1097
    }
1098

1099
    @Override
1100
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1101
        int padding, boolean endStream, ChannelPromise promise) {
1102
      keepAliveEnforcer.resetCounters();
1✔
1103
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1104
    }
1105

1106
    @Override
1107
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1108
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1109
        ChannelPromise promise) {
1110
      keepAliveEnforcer.resetCounters();
×
1111
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1112
          padding, endStream, promise);
1113
    }
1114
  }
1115

1116
  private static class ServerChannelLogger extends ChannelLogger {
1117
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1118

1119
    @Override
1120
    public void log(ChannelLogLevel level, String message) {
1121
      log.log(toJavaLogLevel(level), message);
1✔
1122
    }
1✔
1123

1124
    @Override
1125
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1126
      log(level, MessageFormat.format(messageFormat, args));
1✔
1127
    }
1✔
1128
  }
1129

1130
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1131
    switch (level) {
1✔
1132
      case ERROR:
1133
        return Level.FINE;
×
1134
      case WARNING:
1135
        return Level.FINER;
×
1136
      default:
1137
        return Level.FINEST;
1✔
1138
    }
1139
  }
1140
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc