• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19111

22 Mar 2024 04:32PM UTC coverage: 88.303% (+0.01%) from 88.291%
#19111

push

github

web-flow
Start 1.64.0 development cycle (#11030)

31200 of 35333 relevant lines covered (88.3%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.85
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
64
import io.netty.handler.codec.http2.DefaultHttp2Connection;
65
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
66
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
67
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
68
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
69
import io.netty.handler.codec.http2.DefaultHttp2Headers;
70
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
71
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
72
import io.netty.handler.codec.http2.Http2Connection;
73
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
74
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
75
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
76
import io.netty.handler.codec.http2.Http2Error;
77
import io.netty.handler.codec.http2.Http2Exception;
78
import io.netty.handler.codec.http2.Http2Exception.StreamException;
79
import io.netty.handler.codec.http2.Http2FrameAdapter;
80
import io.netty.handler.codec.http2.Http2FrameLogger;
81
import io.netty.handler.codec.http2.Http2FrameReader;
82
import io.netty.handler.codec.http2.Http2FrameWriter;
83
import io.netty.handler.codec.http2.Http2Headers;
84
import io.netty.handler.codec.http2.Http2HeadersDecoder;
85
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
86
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
87
import io.netty.handler.codec.http2.Http2Settings;
88
import io.netty.handler.codec.http2.Http2Stream;
89
import io.netty.handler.codec.http2.Http2StreamVisitor;
90
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
91
import io.netty.handler.logging.LogLevel;
92
import io.netty.util.AsciiString;
93
import io.netty.util.ReferenceCountUtil;
94
import io.perfmark.PerfMark;
95
import io.perfmark.Tag;
96
import io.perfmark.TaskCloseable;
97
import java.text.MessageFormat;
98
import java.util.List;
99
import java.util.concurrent.Future;
100
import java.util.concurrent.ScheduledFuture;
101
import java.util.concurrent.TimeUnit;
102
import java.util.logging.Level;
103
import java.util.logging.Logger;
104
import javax.annotation.CheckForNull;
105
import javax.annotation.Nullable;
106

107
/**
108
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
109
 * the context of the Netty Channel thread.
110
 */
111
class NettyServerHandler extends AbstractNettyHandler {
112
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
113
  private static final long KEEPALIVE_PING = 0xDEADL;
114
  @VisibleForTesting
115
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
116
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
117
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
118
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
119
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
120

121
  private final Http2Connection.PropertyKey streamKey;
122
  private final ServerTransportListener transportListener;
123
  private final int maxMessageSize;
124
  private final long keepAliveTimeInNanos;
125
  private final long keepAliveTimeoutInNanos;
126
  private final long maxConnectionAgeInNanos;
127
  private final long maxConnectionAgeGraceInNanos;
128
  private final int maxRstCount;
129
  private final long maxRstPeriodNanos;
130
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
131
  private final TransportTracer transportTracer;
132
  private final KeepAliveEnforcer keepAliveEnforcer;
133
  private final Attributes eagAttributes;
134
  private final Ticker ticker;
135
  /** Incomplete attributes produced by negotiator. */
136
  private Attributes negotiationAttributes;
137
  private InternalChannelz.Security securityInfo;
138
  /** Completed attributes produced by transportReady. */
139
  private Attributes attributes;
140
  private Throwable connectionError;
141
  private boolean teWarningLogged;
142
  private WriteQueue serverWriteQueue;
143
  private AsciiString lastKnownAuthority;
144
  @CheckForNull
145
  private KeepAliveManager keepAliveManager;
146
  @CheckForNull
147
  private MaxConnectionIdleManager maxConnectionIdleManager;
148
  @CheckForNull
149
  private ScheduledFuture<?> maxConnectionAgeMonitor;
150
  @CheckForNull
151
  private GracefulShutdown gracefulShutdown;
152
  private int rstCount;
153
  private long lastRstNanoTime;
154

155

156
  static NettyServerHandler newHandler(
157
      ServerTransportListener transportListener,
158
      ChannelPromise channelUnused,
159
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
160
      TransportTracer transportTracer,
161
      int maxStreams,
162
      boolean autoFlowControl,
163
      int flowControlWindow,
164
      int maxHeaderListSize,
165
      int maxMessageSize,
166
      long keepAliveTimeInNanos,
167
      long keepAliveTimeoutInNanos,
168
      long maxConnectionIdleInNanos,
169
      long maxConnectionAgeInNanos,
170
      long maxConnectionAgeGraceInNanos,
171
      boolean permitKeepAliveWithoutCalls,
172
      long permitKeepAliveTimeInNanos,
173
      int maxRstCount,
174
      long maxRstPeriodNanos,
175
      Attributes eagAttributes) {
176
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
177
        maxHeaderListSize);
178
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
179
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
180
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
181
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
182
    Http2FrameWriter frameWriter =
1✔
183
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
184
    return newHandler(
1✔
185
        channelUnused,
186
        frameReader,
187
        frameWriter,
188
        transportListener,
189
        streamTracerFactories,
190
        transportTracer,
191
        maxStreams,
192
        autoFlowControl,
193
        flowControlWindow,
194
        maxHeaderListSize,
195
        maxMessageSize,
196
        keepAliveTimeInNanos,
197
        keepAliveTimeoutInNanos,
198
        maxConnectionIdleInNanos,
199
        maxConnectionAgeInNanos,
200
        maxConnectionAgeGraceInNanos,
201
        permitKeepAliveWithoutCalls,
202
        permitKeepAliveTimeInNanos,
203
        maxRstCount,
204
        maxRstPeriodNanos,
205
        eagAttributes,
206
        Ticker.systemTicker());
1✔
207
  }
208

209
  static NettyServerHandler newHandler(
210
      ChannelPromise channelUnused,
211
      Http2FrameReader frameReader,
212
      Http2FrameWriter frameWriter,
213
      ServerTransportListener transportListener,
214
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
215
      TransportTracer transportTracer,
216
      int maxStreams,
217
      boolean autoFlowControl,
218
      int flowControlWindow,
219
      int maxHeaderListSize,
220
      int maxMessageSize,
221
      long keepAliveTimeInNanos,
222
      long keepAliveTimeoutInNanos,
223
      long maxConnectionIdleInNanos,
224
      long maxConnectionAgeInNanos,
225
      long maxConnectionAgeGraceInNanos,
226
      boolean permitKeepAliveWithoutCalls,
227
      long permitKeepAliveTimeInNanos,
228
      int maxRstCount,
229
      long maxRstPeriodNanos,
230
      Attributes eagAttributes,
231
      Ticker ticker) {
232
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
233
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
234
        flowControlWindow);
235
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
236
        maxHeaderListSize);
237
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
238
        maxMessageSize);
239

240
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
241
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
242
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
243
    DefaultHttp2RemoteFlowController controller =
1✔
244
        new DefaultHttp2RemoteFlowController(connection, dist);
245
    connection.remote().flowController(controller);
1✔
246
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
247
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
248

249
    // Create the local flow controller configured to auto-refill the connection window.
250
    connection.local().flowController(
1✔
251
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
252
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
253
    Http2ConnectionEncoder encoder =
1✔
254
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
255
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
256
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
257
        frameReader);
258

259
    Http2Settings settings = new Http2Settings();
1✔
260
    settings.initialWindowSize(flowControlWindow);
1✔
261
    settings.maxConcurrentStreams(maxStreams);
1✔
262
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
263

264
    if (ticker == null) {
1✔
265
      ticker = Ticker.systemTicker();
×
266
    }
267

268
    return new NettyServerHandler(
1✔
269
        channelUnused,
270
        connection,
271
        transportListener,
272
        streamTracerFactories,
273
        transportTracer,
274
        decoder, encoder, settings,
275
        maxMessageSize,
276
        keepAliveTimeInNanos, keepAliveTimeoutInNanos,
277
        maxConnectionIdleInNanos,
278
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
279
        keepAliveEnforcer,
280
        autoFlowControl,
281
        maxRstCount,
282
        maxRstPeriodNanos,
283
        eagAttributes, ticker);
284
  }
285

286
  private NettyServerHandler(
287
      ChannelPromise channelUnused,
288
      final Http2Connection connection,
289
      ServerTransportListener transportListener,
290
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
291
      TransportTracer transportTracer,
292
      Http2ConnectionDecoder decoder,
293
      Http2ConnectionEncoder encoder,
294
      Http2Settings settings,
295
      int maxMessageSize,
296
      long keepAliveTimeInNanos,
297
      long keepAliveTimeoutInNanos,
298
      long maxConnectionIdleInNanos,
299
      long maxConnectionAgeInNanos,
300
      long maxConnectionAgeGraceInNanos,
301
      final KeepAliveEnforcer keepAliveEnforcer,
302
      boolean autoFlowControl,
303
      int maxRstCount,
304
      long maxRstPeriodNanos,
305
      Attributes eagAttributes,
306
      Ticker ticker) {
307
    super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
1✔
308
        autoFlowControl, null, ticker);
309

310
    final MaxConnectionIdleManager maxConnectionIdleManager;
311
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
312
      maxConnectionIdleManager = null;
1✔
313
    } else {
314
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
315
    }
316

317
    connection.addListener(new Http2ConnectionAdapter() {
1✔
318
      @Override
319
      public void onStreamActive(Http2Stream stream) {
320
        if (connection.numActiveStreams() == 1) {
1✔
321
          keepAliveEnforcer.onTransportActive();
1✔
322
          if (maxConnectionIdleManager != null) {
1✔
323
            maxConnectionIdleManager.onTransportActive();
1✔
324
          }
325
        }
326
      }
1✔
327

328
      @Override
329
      public void onStreamClosed(Http2Stream stream) {
330
        if (connection.numActiveStreams() == 0) {
1✔
331
          keepAliveEnforcer.onTransportIdle();
1✔
332
          if (maxConnectionIdleManager != null) {
1✔
333
            maxConnectionIdleManager.onTransportIdle();
1✔
334
          }
335
        }
336
      }
1✔
337
    });
338

339
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
340
    this.maxMessageSize = maxMessageSize;
1✔
341
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
342
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
343
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
344
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
345
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
346
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
347
    this.maxRstCount = maxRstCount;
1✔
348
    this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
349
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
350
    this.ticker = checkNotNull(ticker, "ticker");
1✔
351

352
    this.lastRstNanoTime = ticker.read();
1✔
353
    streamKey = encoder.connection().newKey();
1✔
354
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
355
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
356
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
357
    // Set the frame listener on the decoder.
358
    decoder().frameListener(new FrameListener());
1✔
359
  }
1✔
360

361
  @Nullable
362
  Throwable connectionError() {
363
    return connectionError;
1✔
364
  }
365

366
  @Override
367
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
368
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
369

370
    // init max connection age monitor
371
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
372
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
373
          new LogExceptionRunnable(new Runnable() {
1✔
374
            @Override
375
            public void run() {
376
              if (gracefulShutdown == null) {
1✔
377
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
378
                gracefulShutdown.start(ctx);
1✔
379
                ctx.flush();
1✔
380
              }
381
            }
1✔
382
          }),
383
          maxConnectionAgeInNanos,
384
          TimeUnit.NANOSECONDS);
385
    }
386

387
    if (maxConnectionIdleManager != null) {
1✔
388
      maxConnectionIdleManager.start(new Runnable() {
1✔
389
        @Override
390
        public void run() {
391
          if (gracefulShutdown == null) {
1✔
392
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
393
            gracefulShutdown.start(ctx);
1✔
394
            ctx.flush();
1✔
395
          }
396
        }
1✔
397
      }, ctx.executor());
1✔
398
    }
399

400
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
401
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
402
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
403
      keepAliveManager.onTransportStarted();
1✔
404
    }
405

406
    assert encoder().connection().equals(decoder().connection());
1✔
407
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
408

409
    super.handlerAdded(ctx);
1✔
410
  }
1✔
411

412
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
413
      throws Http2Exception {
414
    try {
415
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
416
      // by Netty. RFC 7540 section 8.1.2.2
417
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
418
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
419
        return;
×
420
      }
421

422
      if (headers.authority() == null) {
1✔
423
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
424
        if (hosts.size() > 1) {
1✔
425
          // RFC 7230 section 5.4
426
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
427
              "Multiple host headers");
428
          return;
1✔
429
        }
430
        if (!hosts.isEmpty()) {
1✔
431
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
432
        }
433
      }
434
      headers.remove(HOST);
1✔
435

436
      // Remove the leading slash of the path and get the fully qualified method name
437
      CharSequence path = headers.path();
1✔
438

439
      if (path == null) {
1✔
440
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
441
            "Expected path but is missing");
442
        return;
1✔
443
      }
444

445
      if (path.charAt(0) != '/') {
1✔
446
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
447
            String.format("Expected path to start with /: %s", path));
1✔
448
        return;
1✔
449
      }
450

451
      String method = path.subSequence(1, path.length()).toString();
1✔
452

453
      // Verify that the Content-Type is correct in the request.
454
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
455
      if (contentType == null) {
1✔
456
        respondWithHttpError(
1✔
457
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
458
        return;
1✔
459
      }
460
      String contentTypeString = contentType.toString();
1✔
461
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
462
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
463
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
464
        return;
1✔
465
      }
466

467
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
468
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
469
            String.format("Method '%s' is not supported", headers.method()));
1✔
470
        return;
1✔
471
      }
472

473
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
474
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
475
                + "some intermediate proxy may not support trailers",
476
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
477
        teWarningLogged = true;
1✔
478
      }
479

480
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
481
      // method.
482
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
483

484
      Metadata metadata = Utils.convertHeaders(headers);
1✔
485
      StatsTraceContext statsTraceCtx =
1✔
486
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
487

488
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
489
          this,
490
          ctx.channel().eventLoop(),
1✔
491
          http2Stream,
492
          maxMessageSize,
493
          statsTraceCtx,
494
          transportTracer,
495
          method);
496

497
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
498
        PerfMark.attachTag(state.tag());
1✔
499
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
500
        NettyServerStream stream = new NettyServerStream(
1✔
501
            ctx.channel(),
1✔
502
            state,
503
            attributes,
504
            authority,
505
            statsTraceCtx,
506
            transportTracer);
507
        transportListener.streamCreated(stream, method, metadata);
1✔
508
        state.onStreamAllocated();
1✔
509
        http2Stream.setProperty(streamKey, state);
1✔
510
      }
511
    } catch (Exception e) {
×
512
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
513
      // Throw an exception that will get handled by onStreamError.
514
      throw newStreamException(streamId, e);
×
515
    }
1✔
516
  }
1✔
517

518
  private String getOrUpdateAuthority(AsciiString authority) {
519
    if (authority == null) {
1✔
520
      return null;
1✔
521
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
522
      lastKnownAuthority = authority;
1✔
523
    }
524

525
    // AsciiString.toString() is internally cached, so subsequent calls will not
526
    // result in recomputing the String representation of lastKnownAuthority.
527
    return lastKnownAuthority.toString();
1✔
528
  }
529

530
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
531
      throws Http2Exception {
532
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
533
    try {
534
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
535
      if (stream == null) {
1✔
536
        return;
1✔
537
      }
538
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
539
        PerfMark.attachTag(stream.tag());
1✔
540
        stream.inboundDataReceived(data, endOfStream);
1✔
541
      }
542
    } catch (Throwable e) {
×
543
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
544
      // Throw an exception that will get handled by onStreamError.
545
      throw newStreamException(streamId, e);
×
546
    }
1✔
547
  }
1✔
548

549
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
550
    if (maxRstCount > 0) {
1✔
551
      long now = ticker.read();
1✔
552
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
553
        lastRstNanoTime = now;
1✔
554
        rstCount = 1;
1✔
555
      } else {
556
        rstCount++;
1✔
557
        if (rstCount > maxRstCount) {
1✔
558
          throw new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
559
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
560
            @Override
561
            public Throwable fillInStackTrace() {
562
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
563
              return this;
1✔
564
            }
565
          };
566
        }
567
      }
568
    }
569

570
    try {
571
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
572
      if (stream != null) {
1✔
573
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
574
          PerfMark.attachTag(stream.tag());
1✔
575
          stream.transportReportStatus(
1✔
576
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
577
        }
578
      }
579
    } catch (Throwable e) {
1✔
580
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
1✔
581
      // Throw an exception that will get handled by onStreamError.
582
      throw newStreamException(streamId, e);
1✔
583
    }
1✔
584
  }
1✔
585

586
  @Override
587
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
588
      Http2Exception http2Ex) {
589
    logger.log(Level.FINE, "Connection Error", cause);
1✔
590
    connectionError = cause;
1✔
591
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
592
  }
1✔
593

594
  @Override
595
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
596
      StreamException http2Ex) {
597
    NettyServerStream.TransportState serverStream = serverStream(
1✔
598
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
599
    Level level = Level.WARNING;
1✔
600
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
601
      level = Level.FINE;
×
602
    }
603
    logger.log(level, "Stream Error", cause);
1✔
604
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
605
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
606
      PerfMark.attachTag(tag);
1✔
607
      if (serverStream != null) {
1✔
608
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
609
      }
610
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
611
      // Delegate to the base class to send a RST_STREAM.
612
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
613
    }
614
  }
1✔
615

616
  @Override
617
  public void handleProtocolNegotiationCompleted(
618
      Attributes attrs, InternalChannelz.Security securityInfo) {
619
    negotiationAttributes = attrs;
1✔
620
    this.securityInfo = securityInfo;
1✔
621
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
622
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
623
  }
1✔
624

625
  @Override
626
  public Attributes getEagAttributes() {
627
    return eagAttributes;
1✔
628
  }
629

630
  InternalChannelz.Security getSecurityInfo() {
631
    return securityInfo;
1✔
632
  }
633

634
  @VisibleForTesting
635
  KeepAliveManager getKeepAliveManagerForTest() {
636
    return keepAliveManager;
1✔
637
  }
638

639
  @VisibleForTesting
640
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
641
    this.keepAliveManager = keepAliveManager;
1✔
642
  }
1✔
643

644
  /**
645
   * Handler for the Channel shutting down.
646
   */
647
  @Override
648
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
649
    try {
650
      if (keepAliveManager != null) {
1✔
651
        keepAliveManager.onTransportTermination();
1✔
652
      }
653
      if (maxConnectionIdleManager != null) {
1✔
654
        maxConnectionIdleManager.onTransportTermination();
1✔
655
      }
656
      if (maxConnectionAgeMonitor != null) {
1✔
657
        maxConnectionAgeMonitor.cancel(false);
1✔
658
      }
659
      final Status status =
1✔
660
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
661
      // Any streams that are still active must be closed
662
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
663
        @Override
664
        public boolean visit(Http2Stream stream) throws Http2Exception {
665
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
666
          if (serverStream != null) {
1✔
667
            serverStream.transportReportStatus(status);
1✔
668
          }
669
          return true;
1✔
670
        }
671
      });
672
    } finally {
673
      super.channelInactive(ctx);
1✔
674
    }
675
  }
1✔
676

677
  WriteQueue getWriteQueue() {
678
    return serverWriteQueue;
1✔
679
  }
680

681
  /** Handler for commands sent from the stream. */
682
  @Override
683
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
684
      throws Exception {
685
    if (msg instanceof SendGrpcFrameCommand) {
1✔
686
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
687
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
688
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
689
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
690
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
691
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
692
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
693
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
694
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
695
    } else {
696
      AssertionError e =
×
697
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
698
      ReferenceCountUtil.release(msg);
×
699
      promise.setFailure(e);
×
700
      throw e;
×
701
    }
702
  }
1✔
703

704
  @Override
705
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
706
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
707
    ctx.flush();
1✔
708
  }
1✔
709

710
  /**
711
   * Returns the given processed bytes back to inbound flow control.
712
   */
713
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
714
    try {
715
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
716
    } catch (Http2Exception e) {
×
717
      throw new RuntimeException(e);
×
718
    }
1✔
719
  }
1✔
720

721
  private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
722
    promise.addListener(
1✔
723
        new ChannelFutureListener() {
1✔
724
          @Override
725
          public void operationComplete(ChannelFuture future) {
726
            serverStream(stream).complete();
1✔
727
          }
1✔
728
        });
729
  }
1✔
730

731
  private static void streamGone(int streamId, ChannelPromise promise) {
732
    promise.setFailure(
1✔
733
        new IllegalStateException(
734
            "attempting to write to stream " + streamId + " that no longer exists") {
1✔
735
          @Override
736
          public synchronized Throwable fillInStackTrace() {
737
            return this;
1✔
738
          }
739
        });
740
  }
1✔
741

742
  /** Sends the given gRPC frame to the client. */
743
  private void sendGrpcFrame(
744
      ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
745
      throws Http2Exception {
746
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
747
      PerfMark.attachTag(cmd.stream().tag());
1✔
748
      PerfMark.linkIn(cmd.getLink());
1✔
749
      int streamId = cmd.stream().id();
1✔
750
      Http2Stream stream = connection().stream(streamId);
1✔
751
      if (stream == null) {
1✔
752
        streamGone(streamId, promise);
1✔
753
        return;
1✔
754
      }
755
      if (cmd.endStream()) {
1✔
756
        closeStreamWhenDone(promise, stream);
×
757
      }
758
      // Call the base class to write the HTTP/2 DATA frame.
759
      encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
1✔
760
    }
761
  }
1✔
762

763
  /**
764
   * Sends the response headers to the client.
765
   */
766
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
767
      ChannelPromise promise) throws Http2Exception {
768
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
769
      PerfMark.attachTag(cmd.stream().tag());
1✔
770
      PerfMark.linkIn(cmd.getLink());
1✔
771
      int streamId = cmd.stream().id();
1✔
772
      Http2Stream stream = connection().stream(streamId);
1✔
773
      if (stream == null) {
1✔
774
        streamGone(streamId, promise);
1✔
775
        return;
1✔
776
      }
777
      if (cmd.endOfStream()) {
1✔
778
        closeStreamWhenDone(promise, stream);
1✔
779
      }
780
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
781
    }
782
  }
1✔
783

784
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
785
      ChannelPromise promise) {
786
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
787
      PerfMark.attachTag(cmd.stream().tag());
1✔
788
      PerfMark.linkIn(cmd.getLink());
1✔
789
      // Notify the listener if we haven't already.
790
      cmd.stream().transportReportStatus(cmd.reason());
1✔
791
      // Terminate the stream.
792
      encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
793
    }
794
  }
1✔
795

796
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
797
      ChannelPromise promise) throws Exception {
798
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
799
    // requested here. But that's an edge case and seems bug-prone.
800
    if (gracefulShutdown == null) {
1✔
801
      Long graceTimeInNanos = null;
1✔
802
      if (msg.getGraceTimeUnit() != null) {
1✔
803
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
804
      }
805
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
806
      gracefulShutdown.start(ctx);
1✔
807
    }
808
    promise.setSuccess();
1✔
809
  }
1✔
810

811
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
812
      ChannelPromise promise) throws Exception {
813
    super.close(ctx, promise);
1✔
814
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
815
      @Override
816
      public boolean visit(Http2Stream stream) throws Http2Exception {
817
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
818
        if (serverStream != null) {
1✔
819
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
820
            PerfMark.attachTag(serverStream.tag());
1✔
821
            PerfMark.linkIn(msg.getLink());
1✔
822
            serverStream.transportReportStatus(msg.getStatus());
1✔
823
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
824
          }
825
        }
826
        stream.close();
1✔
827
        return true;
1✔
828
      }
829
    });
830
  }
1✔
831

832
  private void respondWithHttpError(
833
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
834
    Metadata metadata = new Metadata();
1✔
835
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
836
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
837
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
838

839
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
840
        .status("" + code)
1✔
841
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
842
    for (int i = 0; i < serialized.length; i += 2) {
1✔
843
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
844
    }
845
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
846
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
847
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
848
  }
1✔
849

850
  private Http2Stream requireHttp2Stream(int streamId) {
851
    Http2Stream stream = connection().stream(streamId);
1✔
852
    if (stream == null) {
1✔
853
      // This should never happen.
854
      throw new AssertionError("Stream does not exist: " + streamId);
×
855
    }
856
    return stream;
1✔
857
  }
858

859
  /**
860
   * Returns the server stream associated to the given HTTP/2 stream object.
861
   */
862
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
863
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
864
  }
865

866
  private Http2Exception newStreamException(int streamId, Throwable cause) {
867
    return Http2Exception.streamError(
1✔
868
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
1✔
869
  }
870

871
  private class FrameListener extends Http2FrameAdapter {
1✔
872
    private boolean firstSettings = true;
1✔
873

874
    @Override
875
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
876
      if (firstSettings) {
1✔
877
        firstSettings = false;
1✔
878
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
879
        // handshakeTimeout
880
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
881
      }
882
    }
1✔
883

884
    @Override
885
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
886
        boolean endOfStream) throws Http2Exception {
887
      if (keepAliveManager != null) {
1✔
888
        keepAliveManager.onDataReceived();
1✔
889
      }
890
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
891
      return padding;
1✔
892
    }
893

894
    @Override
895
    public void onHeadersRead(ChannelHandlerContext ctx,
896
        int streamId,
897
        Http2Headers headers,
898
        int streamDependency,
899
        short weight,
900
        boolean exclusive,
901
        int padding,
902
        boolean endStream) throws Http2Exception {
903
      if (keepAliveManager != null) {
1✔
904
        keepAliveManager.onDataReceived();
1✔
905
      }
906
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
907
      if (endStream) {
1✔
908
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
909
      }
910
    }
1✔
911

912
    @Override
913
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
914
        throws Http2Exception {
915
      if (keepAliveManager != null) {
1✔
916
        keepAliveManager.onDataReceived();
1✔
917
      }
918
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
919
    }
1✔
920

921
    @Override
922
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
923
      if (keepAliveManager != null) {
1✔
924
        keepAliveManager.onDataReceived();
1✔
925
      }
926
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
927
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
928
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
929
            debugData, ctx.newPromise());
1✔
930
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
931
        try {
932
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
933
        } catch (Exception ex) {
×
934
          onError(ctx, /* outbound= */ true, ex);
×
935
        }
1✔
936
      }
937
    }
1✔
938

939
    @Override
940
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
941
      if (keepAliveManager != null) {
1✔
942
        keepAliveManager.onDataReceived();
1✔
943
      }
944
      if (data == flowControlPing().payload()) {
1✔
945
        flowControlPing().updateWindow();
1✔
946
        logger.log(Level.FINE, "Window: {0}",
1✔
947
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
948
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
949
        if (gracefulShutdown == null) {
1✔
950
          // this should never happen
951
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
952
        } else {
953
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
954
        }
955
      } else if (data != KEEPALIVE_PING) {
1✔
956
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
957
      }
958
    }
1✔
959
  }
960

961
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
962
    final ChannelHandlerContext ctx;
963

964
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
965
      this.ctx = ctx;
1✔
966
    }
1✔
967

968
    @Override
969
    public void ping() {
970
      ChannelFuture pingFuture = encoder().writePing(
1✔
971
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
972
      ctx.flush();
1✔
973
      pingFuture.addListener(new ChannelFutureListener() {
1✔
974
        @Override
975
        public void operationComplete(ChannelFuture future) throws Exception {
976
          if (future.isSuccess()) {
1✔
977
            transportTracer.reportKeepAliveSent();
1✔
978
          }
979
        }
1✔
980
      });
981
    }
1✔
982

983
    @Override
984
    public void onPingTimeout() {
985
      try {
986
        forcefulClose(
1✔
987
            ctx,
988
            new ForcefulCloseCommand(Status.UNAVAILABLE
989
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
990
            ctx.newPromise());
1✔
991
      } catch (Exception ex) {
×
992
        try {
993
          exceptionCaught(ctx, ex);
×
994
        } catch (Exception ex2) {
×
995
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
996
          logger.log(Level.WARNING, "Original failure", ex);
×
997
        }
×
998
      }
1✔
999
    }
1✔
1000
  }
1001

1002
  private final class GracefulShutdown {
1003
    String goAwayMessage;
1004

1005
    /**
1006
     * The grace time between starting graceful shutdown and closing the netty channel,
1007
     * {@code null} is unspecified.
1008
     */
1009
    @CheckForNull
1010
    Long graceTimeInNanos;
1011

1012
    /**
1013
     * True if ping is Acked or ping is timeout.
1014
     */
1015
    boolean pingAckedOrTimeout;
1016

1017
    Future<?> pingFuture;
1018

1019
    GracefulShutdown(String goAwayMessage,
1020
        @Nullable Long graceTimeInNanos) {
1✔
1021
      this.goAwayMessage = goAwayMessage;
1✔
1022
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1023
    }
1✔
1024

1025
    /**
1026
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1027
     */
1028
    void start(final ChannelHandlerContext ctx) {
1029
      goAway(
1✔
1030
          ctx,
1031
          Integer.MAX_VALUE,
1032
          Http2Error.NO_ERROR.code(),
1✔
1033
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1034
          ctx.newPromise());
1✔
1035

1036
      pingFuture = ctx.executor().schedule(
1✔
1037
          new Runnable() {
1✔
1038
            @Override
1039
            public void run() {
1040
              secondGoAwayAndClose(ctx);
1✔
1041
            }
1✔
1042
          },
1043
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1044
          TimeUnit.NANOSECONDS);
1045

1046
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1047
    }
1✔
1048

1049
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1050
      if (pingAckedOrTimeout) {
1✔
1051
        return;
×
1052
      }
1053
      pingAckedOrTimeout = true;
1✔
1054

1055
      checkNotNull(pingFuture, "pingFuture");
1✔
1056
      pingFuture.cancel(false);
1✔
1057

1058
      // send the second GOAWAY with last stream id
1059
      goAway(
1✔
1060
          ctx,
1061
          connection().remote().lastStreamCreated(),
1✔
1062
          Http2Error.NO_ERROR.code(),
1✔
1063
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1064
          ctx.newPromise());
1✔
1065

1066
      // gracefully shutdown with specified grace time
1067
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1068
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1069
      try {
1070
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1071
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1072
      } catch (Exception e) {
×
1073
        onError(ctx, /* outbound= */ true, e);
×
1074
      } finally {
1075
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1076
      }
1077
    }
1✔
1078

1079
    private long graceTimeOverrideMillis(long originalMillis) {
1080
      if (graceTimeInNanos == null) {
1✔
1081
        return originalMillis;
1✔
1082
      }
1083
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1084
        // netty treats -1 as "no timeout"
1085
        return -1L;
1✔
1086
      }
1087
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1088
    }
1089
  }
1090

1091
  // Use a frame writer so that we know when frames are through flow control and actually being
1092
  // written.
1093
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1094
    private final KeepAliveEnforcer keepAliveEnforcer;
1095

1096
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1097
        KeepAliveEnforcer keepAliveEnforcer) {
1098
      super(delegate);
1✔
1099
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1100
    }
1✔
1101

1102
    @Override
1103
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1104
        int padding, boolean endStream, ChannelPromise promise) {
1105
      keepAliveEnforcer.resetCounters();
1✔
1106
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1107
    }
1108

1109
    @Override
1110
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1111
        int padding, boolean endStream, ChannelPromise promise) {
1112
      keepAliveEnforcer.resetCounters();
1✔
1113
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1114
    }
1115

1116
    @Override
1117
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1118
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1119
        ChannelPromise promise) {
1120
      keepAliveEnforcer.resetCounters();
×
1121
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1122
          padding, endStream, promise);
1123
    }
1124
  }
1125

1126
  private static class ServerChannelLogger extends ChannelLogger {
1127
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1128

1129
    @Override
1130
    public void log(ChannelLogLevel level, String message) {
1131
      log.log(toJavaLogLevel(level), message);
1✔
1132
    }
1✔
1133

1134
    @Override
1135
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1136
      log(level, MessageFormat.format(messageFormat, args));
1✔
1137
    }
1✔
1138
  }
1139

1140
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1141
    switch (level) {
1✔
1142
      case ERROR:
1143
        return Level.FINE;
×
1144
      case WARNING:
1145
        return Level.FINER;
×
1146
      default:
1147
        return Level.FINEST;
1✔
1148
    }
1149
  }
1150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc