• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19167

22 Apr 2024 02:48PM UTC coverage: 88.084% (-0.01%) from 88.096%
#19167

push

github

ejona86
util: Remove deactivation and GracefulSwitchLb from MultiChildLb

It is easy to manage these things outside of MultiChildLb and it makes
the shared code easier and use less memory. In particular, we don't want
to use many instances of GracefulSwitchLb in virtually every policy
simply because it was needed in one or two cases.

31195 of 35415 relevant lines covered (88.08%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.85
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
64
import io.netty.handler.codec.http2.DefaultHttp2Connection;
65
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
66
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
67
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
68
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
69
import io.netty.handler.codec.http2.DefaultHttp2Headers;
70
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
71
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
72
import io.netty.handler.codec.http2.Http2Connection;
73
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
74
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
75
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
76
import io.netty.handler.codec.http2.Http2Error;
77
import io.netty.handler.codec.http2.Http2Exception;
78
import io.netty.handler.codec.http2.Http2Exception.StreamException;
79
import io.netty.handler.codec.http2.Http2FrameAdapter;
80
import io.netty.handler.codec.http2.Http2FrameLogger;
81
import io.netty.handler.codec.http2.Http2FrameReader;
82
import io.netty.handler.codec.http2.Http2FrameWriter;
83
import io.netty.handler.codec.http2.Http2Headers;
84
import io.netty.handler.codec.http2.Http2HeadersDecoder;
85
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
86
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
87
import io.netty.handler.codec.http2.Http2Settings;
88
import io.netty.handler.codec.http2.Http2Stream;
89
import io.netty.handler.codec.http2.Http2StreamVisitor;
90
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
91
import io.netty.handler.logging.LogLevel;
92
import io.netty.util.AsciiString;
93
import io.netty.util.ReferenceCountUtil;
94
import io.perfmark.PerfMark;
95
import io.perfmark.Tag;
96
import io.perfmark.TaskCloseable;
97
import java.text.MessageFormat;
98
import java.util.List;
99
import java.util.concurrent.Future;
100
import java.util.concurrent.ScheduledFuture;
101
import java.util.concurrent.TimeUnit;
102
import java.util.logging.Level;
103
import java.util.logging.Logger;
104
import javax.annotation.CheckForNull;
105
import javax.annotation.Nullable;
106

107
/**
108
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
109
 * the context of the Netty Channel thread.
110
 */
111
class NettyServerHandler extends AbstractNettyHandler {
112
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
113
  private static final long KEEPALIVE_PING = 0xDEADL;
114
  @VisibleForTesting
115
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
116
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
117
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
118
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
119
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
120

121
  private final Http2Connection.PropertyKey streamKey;
122
  private final ServerTransportListener transportListener;
123
  private final int maxMessageSize;
124
  private final long keepAliveTimeInNanos;
125
  private final long keepAliveTimeoutInNanos;
126
  private final long maxConnectionAgeInNanos;
127
  private final long maxConnectionAgeGraceInNanos;
128
  private final int maxRstCount;
129
  private final long maxRstPeriodNanos;
130
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
131
  private final TransportTracer transportTracer;
132
  private final KeepAliveEnforcer keepAliveEnforcer;
133
  private final Attributes eagAttributes;
134
  private final Ticker ticker;
135
  /** Incomplete attributes produced by negotiator. */
136
  private Attributes negotiationAttributes;
137
  private InternalChannelz.Security securityInfo;
138
  /** Completed attributes produced by transportReady. */
139
  private Attributes attributes;
140
  private Throwable connectionError;
141
  private boolean teWarningLogged;
142
  private WriteQueue serverWriteQueue;
143
  private AsciiString lastKnownAuthority;
144
  @CheckForNull
145
  private KeepAliveManager keepAliveManager;
146
  @CheckForNull
147
  private MaxConnectionIdleManager maxConnectionIdleManager;
148
  @CheckForNull
149
  private ScheduledFuture<?> maxConnectionAgeMonitor;
150
  @CheckForNull
151
  private GracefulShutdown gracefulShutdown;
152
  private int rstCount;
153
  private long lastRstNanoTime;
154

155

156
  static NettyServerHandler newHandler(
157
      ServerTransportListener transportListener,
158
      ChannelPromise channelUnused,
159
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
160
      TransportTracer transportTracer,
161
      int maxStreams,
162
      boolean autoFlowControl,
163
      int flowControlWindow,
164
      int maxHeaderListSize,
165
      int maxMessageSize,
166
      long keepAliveTimeInNanos,
167
      long keepAliveTimeoutInNanos,
168
      long maxConnectionIdleInNanos,
169
      long maxConnectionAgeInNanos,
170
      long maxConnectionAgeGraceInNanos,
171
      boolean permitKeepAliveWithoutCalls,
172
      long permitKeepAliveTimeInNanos,
173
      int maxRstCount,
174
      long maxRstPeriodNanos,
175
      Attributes eagAttributes) {
176
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
177
        maxHeaderListSize);
178
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
179
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
180
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
181
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
182
    Http2FrameWriter frameWriter =
1✔
183
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
184
    return newHandler(
1✔
185
        channelUnused,
186
        frameReader,
187
        frameWriter,
188
        transportListener,
189
        streamTracerFactories,
190
        transportTracer,
191
        maxStreams,
192
        autoFlowControl,
193
        flowControlWindow,
194
        maxHeaderListSize,
195
        maxMessageSize,
196
        keepAliveTimeInNanos,
197
        keepAliveTimeoutInNanos,
198
        maxConnectionIdleInNanos,
199
        maxConnectionAgeInNanos,
200
        maxConnectionAgeGraceInNanos,
201
        permitKeepAliveWithoutCalls,
202
        permitKeepAliveTimeInNanos,
203
        maxRstCount,
204
        maxRstPeriodNanos,
205
        eagAttributes,
206
        Ticker.systemTicker());
1✔
207
  }
208

209
  static NettyServerHandler newHandler(
210
      ChannelPromise channelUnused,
211
      Http2FrameReader frameReader,
212
      Http2FrameWriter frameWriter,
213
      ServerTransportListener transportListener,
214
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
215
      TransportTracer transportTracer,
216
      int maxStreams,
217
      boolean autoFlowControl,
218
      int flowControlWindow,
219
      int maxHeaderListSize,
220
      int maxMessageSize,
221
      long keepAliveTimeInNanos,
222
      long keepAliveTimeoutInNanos,
223
      long maxConnectionIdleInNanos,
224
      long maxConnectionAgeInNanos,
225
      long maxConnectionAgeGraceInNanos,
226
      boolean permitKeepAliveWithoutCalls,
227
      long permitKeepAliveTimeInNanos,
228
      int maxRstCount,
229
      long maxRstPeriodNanos,
230
      Attributes eagAttributes,
231
      Ticker ticker) {
232
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
233
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
234
        flowControlWindow);
235
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
236
        maxHeaderListSize);
237
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
238
        maxMessageSize);
239

240
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
241
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
242
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
243
    DefaultHttp2RemoteFlowController controller =
1✔
244
        new DefaultHttp2RemoteFlowController(connection, dist);
245
    connection.remote().flowController(controller);
1✔
246
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
247
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
248

249
    // Create the local flow controller configured to auto-refill the connection window.
250
    connection.local().flowController(
1✔
251
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
252
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
253
    Http2ConnectionEncoder encoder =
1✔
254
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
255
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
256
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
257
        frameReader);
258

259
    Http2Settings settings = new Http2Settings();
1✔
260
    settings.initialWindowSize(flowControlWindow);
1✔
261
    settings.maxConcurrentStreams(maxStreams);
1✔
262
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
263

264
    if (ticker == null) {
1✔
265
      ticker = Ticker.systemTicker();
×
266
    }
267

268
    return new NettyServerHandler(
1✔
269
        channelUnused,
270
        connection,
271
        transportListener,
272
        streamTracerFactories,
273
        transportTracer,
274
        decoder, encoder, settings,
275
        maxMessageSize,
276
        keepAliveTimeInNanos, keepAliveTimeoutInNanos,
277
        maxConnectionIdleInNanos,
278
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
279
        keepAliveEnforcer,
280
        autoFlowControl,
281
        maxRstCount,
282
        maxRstPeriodNanos,
283
        eagAttributes, ticker);
284
  }
285

286
  private NettyServerHandler(
287
      ChannelPromise channelUnused,
288
      final Http2Connection connection,
289
      ServerTransportListener transportListener,
290
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
291
      TransportTracer transportTracer,
292
      Http2ConnectionDecoder decoder,
293
      Http2ConnectionEncoder encoder,
294
      Http2Settings settings,
295
      int maxMessageSize,
296
      long keepAliveTimeInNanos,
297
      long keepAliveTimeoutInNanos,
298
      long maxConnectionIdleInNanos,
299
      long maxConnectionAgeInNanos,
300
      long maxConnectionAgeGraceInNanos,
301
      final KeepAliveEnforcer keepAliveEnforcer,
302
      boolean autoFlowControl,
303
      int maxRstCount,
304
      long maxRstPeriodNanos,
305
      Attributes eagAttributes,
306
      Ticker ticker) {
307
    super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
1✔
308
        autoFlowControl, null, ticker);
309

310
    final MaxConnectionIdleManager maxConnectionIdleManager;
311
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
312
      maxConnectionIdleManager = null;
1✔
313
    } else {
314
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
315
    }
316

317
    connection.addListener(new Http2ConnectionAdapter() {
1✔
318
      @Override
319
      public void onStreamActive(Http2Stream stream) {
320
        if (connection.numActiveStreams() == 1) {
1✔
321
          keepAliveEnforcer.onTransportActive();
1✔
322
          if (maxConnectionIdleManager != null) {
1✔
323
            maxConnectionIdleManager.onTransportActive();
1✔
324
          }
325
        }
326
      }
1✔
327

328
      @Override
329
      public void onStreamClosed(Http2Stream stream) {
330
        if (connection.numActiveStreams() == 0) {
1✔
331
          keepAliveEnforcer.onTransportIdle();
1✔
332
          if (maxConnectionIdleManager != null) {
1✔
333
            maxConnectionIdleManager.onTransportIdle();
1✔
334
          }
335
        }
336
      }
1✔
337
    });
338

339
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
340
    this.maxMessageSize = maxMessageSize;
1✔
341
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
342
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
343
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
344
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
345
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
346
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
347
    this.maxRstCount = maxRstCount;
1✔
348
    this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
349
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
350
    this.ticker = checkNotNull(ticker, "ticker");
1✔
351

352
    this.lastRstNanoTime = ticker.read();
1✔
353
    streamKey = encoder.connection().newKey();
1✔
354
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
355
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
356
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
357
    // Set the frame listener on the decoder.
358
    decoder().frameListener(new FrameListener());
1✔
359
  }
1✔
360

361
  @Nullable
362
  Throwable connectionError() {
363
    return connectionError;
1✔
364
  }
365

366
  @Override
367
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
368
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
369

370
    // init max connection age monitor
371
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
372
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
373
          new LogExceptionRunnable(new Runnable() {
1✔
374
            @Override
375
            public void run() {
376
              if (gracefulShutdown == null) {
1✔
377
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
378
                gracefulShutdown.start(ctx);
1✔
379
                ctx.flush();
1✔
380
              }
381
            }
1✔
382
          }),
383
          maxConnectionAgeInNanos,
384
          TimeUnit.NANOSECONDS);
385
    }
386

387
    if (maxConnectionIdleManager != null) {
1✔
388
      maxConnectionIdleManager.start(new Runnable() {
1✔
389
        @Override
390
        public void run() {
391
          if (gracefulShutdown == null) {
1✔
392
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
393
            gracefulShutdown.start(ctx);
1✔
394
            ctx.flush();
1✔
395
          }
396
        }
1✔
397
      }, ctx.executor());
1✔
398
    }
399

400
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
401
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
402
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
403
      keepAliveManager.onTransportStarted();
1✔
404
    }
405

406
    assert encoder().connection().equals(decoder().connection());
1✔
407
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
408

409
    super.handlerAdded(ctx);
1✔
410
  }
1✔
411

412
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
413
      throws Http2Exception {
414
    try {
415
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
416
      // by Netty. RFC 7540 section 8.1.2.2
417
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
418
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
419
        return;
×
420
      }
421

422
      if (headers.authority() == null) {
1✔
423
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
424
        if (hosts.size() > 1) {
1✔
425
          // RFC 7230 section 5.4
426
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
427
              "Multiple host headers");
428
          return;
1✔
429
        }
430
        if (!hosts.isEmpty()) {
1✔
431
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
432
        }
433
      }
434
      headers.remove(HOST);
1✔
435

436
      // Remove the leading slash of the path and get the fully qualified method name
437
      CharSequence path = headers.path();
1✔
438

439
      if (path == null) {
1✔
440
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
441
            "Expected path but is missing");
442
        return;
1✔
443
      }
444

445
      if (path.charAt(0) != '/') {
1✔
446
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
447
            String.format("Expected path to start with /: %s", path));
1✔
448
        return;
1✔
449
      }
450

451
      String method = path.subSequence(1, path.length()).toString();
1✔
452

453
      // Verify that the Content-Type is correct in the request.
454
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
455
      if (contentType == null) {
1✔
456
        respondWithHttpError(
1✔
457
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
458
        return;
1✔
459
      }
460
      String contentTypeString = contentType.toString();
1✔
461
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
462
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
463
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
464
        return;
1✔
465
      }
466

467
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
468
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
469
            String.format("Method '%s' is not supported", headers.method()));
1✔
470
        return;
1✔
471
      }
472

473
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
474
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
475
                + "some intermediate proxy may not support trailers",
476
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
477
        teWarningLogged = true;
1✔
478
      }
479

480
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
481
      // method.
482
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
483

484
      Metadata metadata = Utils.convertHeaders(headers);
1✔
485
      StatsTraceContext statsTraceCtx =
1✔
486
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
487

488
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
489
          this,
490
          ctx.channel().eventLoop(),
1✔
491
          http2Stream,
492
          maxMessageSize,
493
          statsTraceCtx,
494
          transportTracer,
495
          method);
496

497
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
498
        PerfMark.attachTag(state.tag());
1✔
499
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
500
        NettyServerStream stream = new NettyServerStream(
1✔
501
            ctx.channel(),
1✔
502
            state,
503
            attributes,
504
            authority,
505
            statsTraceCtx);
506
        transportListener.streamCreated(stream, method, metadata);
1✔
507
        state.onStreamAllocated();
1✔
508
        http2Stream.setProperty(streamKey, state);
1✔
509
      }
510
    } catch (Exception e) {
×
511
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
512
      // Throw an exception that will get handled by onStreamError.
513
      throw newStreamException(streamId, e);
×
514
    }
1✔
515
  }
1✔
516

517
  private String getOrUpdateAuthority(AsciiString authority) {
518
    if (authority == null) {
1✔
519
      return null;
1✔
520
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
521
      lastKnownAuthority = authority;
1✔
522
    }
523

524
    // AsciiString.toString() is internally cached, so subsequent calls will not
525
    // result in recomputing the String representation of lastKnownAuthority.
526
    return lastKnownAuthority.toString();
1✔
527
  }
528

529
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
530
      throws Http2Exception {
531
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
532
    try {
533
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
534
      if (stream == null) {
1✔
535
        return;
1✔
536
      }
537
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
538
        PerfMark.attachTag(stream.tag());
1✔
539
        stream.inboundDataReceived(data, endOfStream);
1✔
540
      }
541
    } catch (Throwable e) {
×
542
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
543
      // Throw an exception that will get handled by onStreamError.
544
      throw newStreamException(streamId, e);
×
545
    }
1✔
546
  }
1✔
547

548
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
549
    if (maxRstCount > 0) {
1✔
550
      long now = ticker.read();
1✔
551
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
552
        lastRstNanoTime = now;
1✔
553
        rstCount = 1;
1✔
554
      } else {
555
        rstCount++;
1✔
556
        if (rstCount > maxRstCount) {
1✔
557
          throw new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
558
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
559
            @Override
560
            public Throwable fillInStackTrace() {
561
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
562
              return this;
1✔
563
            }
564
          };
565
        }
566
      }
567
    }
568

569
    try {
570
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
571
      if (stream != null) {
1✔
572
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
573
          PerfMark.attachTag(stream.tag());
1✔
574
          stream.transportReportStatus(
1✔
575
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
576
        }
577
      }
578
    } catch (Throwable e) {
1✔
579
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
1✔
580
      // Throw an exception that will get handled by onStreamError.
581
      throw newStreamException(streamId, e);
1✔
582
    }
1✔
583
  }
1✔
584

585
  @Override
586
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
587
      Http2Exception http2Ex) {
588
    logger.log(Level.FINE, "Connection Error", cause);
1✔
589
    connectionError = cause;
1✔
590
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
591
  }
1✔
592

593
  @Override
594
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
595
      StreamException http2Ex) {
596
    NettyServerStream.TransportState serverStream = serverStream(
1✔
597
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
598
    Level level = Level.WARNING;
1✔
599
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
600
      level = Level.FINE;
×
601
    }
602
    logger.log(level, "Stream Error", cause);
1✔
603
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
604
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
605
      PerfMark.attachTag(tag);
1✔
606
      if (serverStream != null) {
1✔
607
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
608
      }
609
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
610
      // Delegate to the base class to send a RST_STREAM.
611
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
612
    }
613
  }
1✔
614

615
  @Override
616
  public void handleProtocolNegotiationCompleted(
617
      Attributes attrs, InternalChannelz.Security securityInfo) {
618
    negotiationAttributes = attrs;
1✔
619
    this.securityInfo = securityInfo;
1✔
620
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
621
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
622
  }
1✔
623

624
  @Override
625
  public Attributes getEagAttributes() {
626
    return eagAttributes;
1✔
627
  }
628

629
  InternalChannelz.Security getSecurityInfo() {
630
    return securityInfo;
1✔
631
  }
632

633
  @VisibleForTesting
634
  KeepAliveManager getKeepAliveManagerForTest() {
635
    return keepAliveManager;
1✔
636
  }
637

638
  @VisibleForTesting
639
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
640
    this.keepAliveManager = keepAliveManager;
1✔
641
  }
1✔
642

643
  /**
644
   * Handler for the Channel shutting down.
645
   */
646
  @Override
647
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
648
    try {
649
      if (keepAliveManager != null) {
1✔
650
        keepAliveManager.onTransportTermination();
1✔
651
      }
652
      if (maxConnectionIdleManager != null) {
1✔
653
        maxConnectionIdleManager.onTransportTermination();
1✔
654
      }
655
      if (maxConnectionAgeMonitor != null) {
1✔
656
        maxConnectionAgeMonitor.cancel(false);
1✔
657
      }
658
      final Status status =
1✔
659
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
660
      // Any streams that are still active must be closed
661
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
662
        @Override
663
        public boolean visit(Http2Stream stream) throws Http2Exception {
664
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
665
          if (serverStream != null) {
1✔
666
            serverStream.transportReportStatus(status);
1✔
667
          }
668
          return true;
1✔
669
        }
670
      });
671
    } finally {
672
      super.channelInactive(ctx);
1✔
673
    }
674
  }
1✔
675

676
  WriteQueue getWriteQueue() {
677
    return serverWriteQueue;
1✔
678
  }
679

680
  /** Handler for commands sent from the stream. */
681
  @Override
682
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
683
      throws Exception {
684
    if (msg instanceof SendGrpcFrameCommand) {
1✔
685
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
686
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
687
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
688
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
689
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
690
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
691
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
692
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
693
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
694
    } else {
695
      AssertionError e =
×
696
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
697
      ReferenceCountUtil.release(msg);
×
698
      promise.setFailure(e);
×
699
      throw e;
×
700
    }
701
  }
1✔
702

703
  @Override
704
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
705
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
706
    ctx.flush();
1✔
707
  }
1✔
708

709
  /**
710
   * Returns the given processed bytes back to inbound flow control.
711
   */
712
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
713
    try {
714
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
715
    } catch (Http2Exception e) {
×
716
      throw new RuntimeException(e);
×
717
    }
1✔
718
  }
1✔
719

720
  private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
721
    promise.addListener(
1✔
722
        new ChannelFutureListener() {
1✔
723
          @Override
724
          public void operationComplete(ChannelFuture future) {
725
            serverStream(stream).complete();
1✔
726
          }
1✔
727
        });
728
  }
1✔
729

730
  private static void streamGone(int streamId, ChannelPromise promise) {
731
    promise.setFailure(
1✔
732
        new IllegalStateException(
733
            "attempting to write to stream " + streamId + " that no longer exists") {
1✔
734
          @Override
735
          public synchronized Throwable fillInStackTrace() {
736
            return this;
1✔
737
          }
738
        });
739
  }
1✔
740

741
  /** Sends the given gRPC frame to the client. */
742
  private void sendGrpcFrame(
743
      ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
744
      throws Http2Exception {
745
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
746
      PerfMark.attachTag(cmd.stream().tag());
1✔
747
      PerfMark.linkIn(cmd.getLink());
1✔
748
      int streamId = cmd.stream().id();
1✔
749
      Http2Stream stream = connection().stream(streamId);
1✔
750
      if (stream == null) {
1✔
751
        streamGone(streamId, promise);
1✔
752
        return;
1✔
753
      }
754
      if (cmd.endStream()) {
1✔
755
        closeStreamWhenDone(promise, stream);
×
756
      }
757
      // Call the base class to write the HTTP/2 DATA frame.
758
      encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
1✔
759
    }
760
  }
1✔
761

762
  /**
763
   * Sends the response headers to the client.
764
   */
765
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
766
      ChannelPromise promise) throws Http2Exception {
767
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
768
      PerfMark.attachTag(cmd.stream().tag());
1✔
769
      PerfMark.linkIn(cmd.getLink());
1✔
770
      int streamId = cmd.stream().id();
1✔
771
      Http2Stream stream = connection().stream(streamId);
1✔
772
      if (stream == null) {
1✔
773
        streamGone(streamId, promise);
1✔
774
        return;
1✔
775
      }
776
      if (cmd.endOfStream()) {
1✔
777
        closeStreamWhenDone(promise, stream);
1✔
778
      }
779
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
780
    }
781
  }
1✔
782

783
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
784
      ChannelPromise promise) {
785
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
786
      PerfMark.attachTag(cmd.stream().tag());
1✔
787
      PerfMark.linkIn(cmd.getLink());
1✔
788
      // Notify the listener if we haven't already.
789
      cmd.stream().transportReportStatus(cmd.reason());
1✔
790
      // Terminate the stream.
791
      encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
792
    }
793
  }
1✔
794

795
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
796
      ChannelPromise promise) throws Exception {
797
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
798
    // requested here. But that's an edge case and seems bug-prone.
799
    if (gracefulShutdown == null) {
1✔
800
      Long graceTimeInNanos = null;
1✔
801
      if (msg.getGraceTimeUnit() != null) {
1✔
802
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
803
      }
804
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
805
      gracefulShutdown.start(ctx);
1✔
806
    }
807
    promise.setSuccess();
1✔
808
  }
1✔
809

810
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
811
      ChannelPromise promise) throws Exception {
812
    super.close(ctx, promise);
1✔
813
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
814
      @Override
815
      public boolean visit(Http2Stream stream) throws Http2Exception {
816
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
817
        if (serverStream != null) {
1✔
818
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
819
            PerfMark.attachTag(serverStream.tag());
1✔
820
            PerfMark.linkIn(msg.getLink());
1✔
821
            serverStream.transportReportStatus(msg.getStatus());
1✔
822
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
823
          }
824
        }
825
        stream.close();
1✔
826
        return true;
1✔
827
      }
828
    });
829
  }
1✔
830

831
  private void respondWithHttpError(
832
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
833
    Metadata metadata = new Metadata();
1✔
834
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
835
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
836
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
837

838
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
839
        .status("" + code)
1✔
840
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
841
    for (int i = 0; i < serialized.length; i += 2) {
1✔
842
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
843
    }
844
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
845
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
846
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
847
  }
1✔
848

849
  private Http2Stream requireHttp2Stream(int streamId) {
850
    Http2Stream stream = connection().stream(streamId);
1✔
851
    if (stream == null) {
1✔
852
      // This should never happen.
853
      throw new AssertionError("Stream does not exist: " + streamId);
×
854
    }
855
    return stream;
1✔
856
  }
857

858
  /**
859
   * Returns the server stream associated to the given HTTP/2 stream object.
860
   */
861
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
862
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
863
  }
864

865
  private Http2Exception newStreamException(int streamId, Throwable cause) {
866
    return Http2Exception.streamError(
1✔
867
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
1✔
868
  }
869

870
  private class FrameListener extends Http2FrameAdapter {
1✔
871
    private boolean firstSettings = true;
1✔
872

873
    @Override
874
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
875
      if (firstSettings) {
1✔
876
        firstSettings = false;
1✔
877
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
878
        // handshakeTimeout
879
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
880
      }
881
    }
1✔
882

883
    @Override
884
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
885
        boolean endOfStream) throws Http2Exception {
886
      if (keepAliveManager != null) {
1✔
887
        keepAliveManager.onDataReceived();
1✔
888
      }
889
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
890
      return padding;
1✔
891
    }
892

893
    @Override
894
    public void onHeadersRead(ChannelHandlerContext ctx,
895
        int streamId,
896
        Http2Headers headers,
897
        int streamDependency,
898
        short weight,
899
        boolean exclusive,
900
        int padding,
901
        boolean endStream) throws Http2Exception {
902
      if (keepAliveManager != null) {
1✔
903
        keepAliveManager.onDataReceived();
1✔
904
      }
905
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
906
      if (endStream) {
1✔
907
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
908
      }
909
    }
1✔
910

911
    @Override
912
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
913
        throws Http2Exception {
914
      if (keepAliveManager != null) {
1✔
915
        keepAliveManager.onDataReceived();
1✔
916
      }
917
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
918
    }
1✔
919

920
    @Override
921
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
922
      if (keepAliveManager != null) {
1✔
923
        keepAliveManager.onDataReceived();
1✔
924
      }
925
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
926
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
927
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
928
            debugData, ctx.newPromise());
1✔
929
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
930
        try {
931
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
932
        } catch (Exception ex) {
×
933
          onError(ctx, /* outbound= */ true, ex);
×
934
        }
1✔
935
      }
936
    }
1✔
937

938
    @Override
939
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
940
      if (keepAliveManager != null) {
1✔
941
        keepAliveManager.onDataReceived();
1✔
942
      }
943
      if (data == flowControlPing().payload()) {
1✔
944
        flowControlPing().updateWindow();
1✔
945
        logger.log(Level.FINE, "Window: {0}",
1✔
946
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
947
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
948
        if (gracefulShutdown == null) {
1✔
949
          // this should never happen
950
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
951
        } else {
952
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
953
        }
954
      } else if (data != KEEPALIVE_PING) {
1✔
955
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
956
      }
957
    }
1✔
958
  }
959

960
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
961
    final ChannelHandlerContext ctx;
962

963
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
964
      this.ctx = ctx;
1✔
965
    }
1✔
966

967
    @Override
968
    public void ping() {
969
      ChannelFuture pingFuture = encoder().writePing(
1✔
970
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
971
      ctx.flush();
1✔
972
      pingFuture.addListener(new ChannelFutureListener() {
1✔
973
        @Override
974
        public void operationComplete(ChannelFuture future) throws Exception {
975
          if (future.isSuccess()) {
1✔
976
            transportTracer.reportKeepAliveSent();
1✔
977
          }
978
        }
1✔
979
      });
980
    }
1✔
981

982
    @Override
983
    public void onPingTimeout() {
984
      try {
985
        forcefulClose(
1✔
986
            ctx,
987
            new ForcefulCloseCommand(Status.UNAVAILABLE
988
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
989
            ctx.newPromise());
1✔
990
      } catch (Exception ex) {
×
991
        try {
992
          exceptionCaught(ctx, ex);
×
993
        } catch (Exception ex2) {
×
994
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
995
          logger.log(Level.WARNING, "Original failure", ex);
×
996
        }
×
997
      }
1✔
998
    }
1✔
999
  }
1000

1001
  private final class GracefulShutdown {
1002
    String goAwayMessage;
1003

1004
    /**
1005
     * The grace time between starting graceful shutdown and closing the netty channel,
1006
     * {@code null} is unspecified.
1007
     */
1008
    @CheckForNull
1009
    Long graceTimeInNanos;
1010

1011
    /**
1012
     * True if ping is Acked or ping is timeout.
1013
     */
1014
    boolean pingAckedOrTimeout;
1015

1016
    Future<?> pingFuture;
1017

1018
    GracefulShutdown(String goAwayMessage,
1019
        @Nullable Long graceTimeInNanos) {
1✔
1020
      this.goAwayMessage = goAwayMessage;
1✔
1021
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1022
    }
1✔
1023

1024
    /**
1025
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1026
     */
1027
    void start(final ChannelHandlerContext ctx) {
1028
      goAway(
1✔
1029
          ctx,
1030
          Integer.MAX_VALUE,
1031
          Http2Error.NO_ERROR.code(),
1✔
1032
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1033
          ctx.newPromise());
1✔
1034

1035
      pingFuture = ctx.executor().schedule(
1✔
1036
          new Runnable() {
1✔
1037
            @Override
1038
            public void run() {
1039
              secondGoAwayAndClose(ctx);
1✔
1040
            }
1✔
1041
          },
1042
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1043
          TimeUnit.NANOSECONDS);
1044

1045
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1046
    }
1✔
1047

1048
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1049
      if (pingAckedOrTimeout) {
1✔
1050
        return;
×
1051
      }
1052
      pingAckedOrTimeout = true;
1✔
1053

1054
      checkNotNull(pingFuture, "pingFuture");
1✔
1055
      pingFuture.cancel(false);
1✔
1056

1057
      // send the second GOAWAY with last stream id
1058
      goAway(
1✔
1059
          ctx,
1060
          connection().remote().lastStreamCreated(),
1✔
1061
          Http2Error.NO_ERROR.code(),
1✔
1062
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1063
          ctx.newPromise());
1✔
1064

1065
      // gracefully shutdown with specified grace time
1066
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1067
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1068
      try {
1069
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1070
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1071
      } catch (Exception e) {
×
1072
        onError(ctx, /* outbound= */ true, e);
×
1073
      } finally {
1074
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1075
      }
1076
    }
1✔
1077

1078
    private long graceTimeOverrideMillis(long originalMillis) {
1079
      if (graceTimeInNanos == null) {
1✔
1080
        return originalMillis;
1✔
1081
      }
1082
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1083
        // netty treats -1 as "no timeout"
1084
        return -1L;
1✔
1085
      }
1086
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1087
    }
1088
  }
1089

1090
  // Use a frame writer so that we know when frames are through flow control and actually being
1091
  // written.
1092
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1093
    private final KeepAliveEnforcer keepAliveEnforcer;
1094

1095
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1096
        KeepAliveEnforcer keepAliveEnforcer) {
1097
      super(delegate);
1✔
1098
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1099
    }
1✔
1100

1101
    @Override
1102
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1103
        int padding, boolean endStream, ChannelPromise promise) {
1104
      keepAliveEnforcer.resetCounters();
1✔
1105
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1106
    }
1107

1108
    @Override
1109
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1110
        int padding, boolean endStream, ChannelPromise promise) {
1111
      keepAliveEnforcer.resetCounters();
1✔
1112
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1113
    }
1114

1115
    @Override
1116
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1117
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1118
        ChannelPromise promise) {
1119
      keepAliveEnforcer.resetCounters();
×
1120
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1121
          padding, endStream, promise);
1122
    }
1123
  }
1124

1125
  private static class ServerChannelLogger extends ChannelLogger {
1126
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1127

1128
    @Override
1129
    public void log(ChannelLogLevel level, String message) {
1130
      log.log(toJavaLogLevel(level), message);
1✔
1131
    }
1✔
1132

1133
    @Override
1134
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1135
      log(level, MessageFormat.format(messageFormat, args));
1✔
1136
    }
1✔
1137
  }
1138

1139
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1140
    switch (level) {
1✔
1141
      case ERROR:
1142
        return Level.FINE;
×
1143
      case WARNING:
1144
        return Level.FINER;
×
1145
      default:
1146
        return Level.FINEST;
1✔
1147
    }
1148
  }
1149
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc