• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19159

08 Apr 2024 07:52PM UTC coverage: 88.154% (+0.02%) from 88.134%
#19159

push

github

web-flow
buildscripts: Migrate PSM Interop to Artifact Registry (#11079) (#11095)

From Container Registry (gcr.io) to Artifact Registry (pkg.dev).

30839 of 34983 relevant lines covered (88.15%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.77
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
64
import io.netty.handler.codec.http2.DefaultHttp2Connection;
65
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
66
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
67
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
68
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
69
import io.netty.handler.codec.http2.DefaultHttp2Headers;
70
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
71
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
72
import io.netty.handler.codec.http2.Http2Connection;
73
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
74
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
75
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
76
import io.netty.handler.codec.http2.Http2Error;
77
import io.netty.handler.codec.http2.Http2Exception;
78
import io.netty.handler.codec.http2.Http2Exception.StreamException;
79
import io.netty.handler.codec.http2.Http2FrameAdapter;
80
import io.netty.handler.codec.http2.Http2FrameLogger;
81
import io.netty.handler.codec.http2.Http2FrameReader;
82
import io.netty.handler.codec.http2.Http2FrameWriter;
83
import io.netty.handler.codec.http2.Http2Headers;
84
import io.netty.handler.codec.http2.Http2HeadersDecoder;
85
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
86
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
87
import io.netty.handler.codec.http2.Http2Settings;
88
import io.netty.handler.codec.http2.Http2Stream;
89
import io.netty.handler.codec.http2.Http2StreamVisitor;
90
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
91
import io.netty.handler.logging.LogLevel;
92
import io.netty.util.AsciiString;
93
import io.netty.util.ReferenceCountUtil;
94
import io.perfmark.PerfMark;
95
import io.perfmark.Tag;
96
import io.perfmark.TaskCloseable;
97
import java.text.MessageFormat;
98
import java.util.List;
99
import java.util.concurrent.Future;
100
import java.util.concurrent.ScheduledFuture;
101
import java.util.concurrent.TimeUnit;
102
import java.util.logging.Level;
103
import java.util.logging.Logger;
104
import javax.annotation.CheckForNull;
105
import javax.annotation.Nullable;
106

107
/**
108
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
109
 * the context of the Netty Channel thread.
110
 */
111
class NettyServerHandler extends AbstractNettyHandler {
112
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
113
  private static final long KEEPALIVE_PING = 0xDEADL;
114
  @VisibleForTesting
115
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
116
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
117
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
118
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
119
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
120

121
  private final Http2Connection.PropertyKey streamKey;
122
  private final ServerTransportListener transportListener;
123
  private final int maxMessageSize;
124
  private final long keepAliveTimeInNanos;
125
  private final long keepAliveTimeoutInNanos;
126
  private final long maxConnectionAgeInNanos;
127
  private final long maxConnectionAgeGraceInNanos;
128
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
129
  private final TransportTracer transportTracer;
130
  private final KeepAliveEnforcer keepAliveEnforcer;
131
  private final Attributes eagAttributes;
132
  /** Incomplete attributes produced by negotiator. */
133
  private Attributes negotiationAttributes;
134
  private InternalChannelz.Security securityInfo;
135
  /** Completed attributes produced by transportReady. */
136
  private Attributes attributes;
137
  private Throwable connectionError;
138
  private boolean teWarningLogged;
139
  private WriteQueue serverWriteQueue;
140
  private AsciiString lastKnownAuthority;
141
  @CheckForNull
142
  private KeepAliveManager keepAliveManager;
143
  @CheckForNull
144
  private MaxConnectionIdleManager maxConnectionIdleManager;
145
  @CheckForNull
146
  private ScheduledFuture<?> maxConnectionAgeMonitor;
147
  @CheckForNull
148
  private GracefulShutdown gracefulShutdown;
149

150
  static NettyServerHandler newHandler(
151
      ServerTransportListener transportListener,
152
      ChannelPromise channelUnused,
153
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
154
      TransportTracer transportTracer,
155
      int maxStreams,
156
      boolean autoFlowControl,
157
      int flowControlWindow,
158
      int maxHeaderListSize,
159
      int maxMessageSize,
160
      long keepAliveTimeInNanos,
161
      long keepAliveTimeoutInNanos,
162
      long maxConnectionIdleInNanos,
163
      long maxConnectionAgeInNanos,
164
      long maxConnectionAgeGraceInNanos,
165
      boolean permitKeepAliveWithoutCalls,
166
      long permitKeepAliveTimeInNanos,
167
      Attributes eagAttributes) {
168
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
169
        maxHeaderListSize);
170
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
171
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
172
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
173
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
174
    Http2FrameWriter frameWriter =
1✔
175
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
176
    return newHandler(
1✔
177
        channelUnused,
178
        frameReader,
179
        frameWriter,
180
        transportListener,
181
        streamTracerFactories,
182
        transportTracer,
183
        maxStreams,
184
        autoFlowControl,
185
        flowControlWindow,
186
        maxHeaderListSize,
187
        maxMessageSize,
188
        keepAliveTimeInNanos,
189
        keepAliveTimeoutInNanos,
190
        maxConnectionIdleInNanos,
191
        maxConnectionAgeInNanos,
192
        maxConnectionAgeGraceInNanos,
193
        permitKeepAliveWithoutCalls,
194
        permitKeepAliveTimeInNanos,
195
        eagAttributes,
196
        Ticker.systemTicker());
1✔
197
  }
198

199
  static NettyServerHandler newHandler(
200
      ChannelPromise channelUnused,
201
      Http2FrameReader frameReader,
202
      Http2FrameWriter frameWriter,
203
      ServerTransportListener transportListener,
204
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
205
      TransportTracer transportTracer,
206
      int maxStreams,
207
      boolean autoFlowControl,
208
      int flowControlWindow,
209
      int maxHeaderListSize,
210
      int maxMessageSize,
211
      long keepAliveTimeInNanos,
212
      long keepAliveTimeoutInNanos,
213
      long maxConnectionIdleInNanos,
214
      long maxConnectionAgeInNanos,
215
      long maxConnectionAgeGraceInNanos,
216
      boolean permitKeepAliveWithoutCalls,
217
      long permitKeepAliveTimeInNanos,
218
      Attributes eagAttributes,
219
      Ticker ticker) {
220
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
221
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
222
        flowControlWindow);
223
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
224
        maxHeaderListSize);
225
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
226
        maxMessageSize);
227

228
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
229
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
230
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
231
    DefaultHttp2RemoteFlowController controller =
1✔
232
        new DefaultHttp2RemoteFlowController(connection, dist);
233
    connection.remote().flowController(controller);
1✔
234
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
235
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
236

237
    // Create the local flow controller configured to auto-refill the connection window.
238
    connection.local().flowController(
1✔
239
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
240
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
241
    Http2ConnectionEncoder encoder =
1✔
242
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
243
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
244
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
245
        frameReader);
246

247
    Http2Settings settings = new Http2Settings();
1✔
248
    settings.initialWindowSize(flowControlWindow);
1✔
249
    settings.maxConcurrentStreams(maxStreams);
1✔
250
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
251

252
    if (ticker == null) {
1✔
253
      ticker = Ticker.systemTicker();
×
254
    }
255

256
    return new NettyServerHandler(
1✔
257
        channelUnused,
258
        connection,
259
        transportListener,
260
        streamTracerFactories,
261
        transportTracer,
262
        decoder, encoder, settings,
263
        maxMessageSize,
264
        keepAliveTimeInNanos, keepAliveTimeoutInNanos,
265
        maxConnectionIdleInNanos,
266
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
267
        keepAliveEnforcer,
268
        autoFlowControl,
269
        eagAttributes, ticker);
270
  }
271

272
  private NettyServerHandler(
273
      ChannelPromise channelUnused,
274
      final Http2Connection connection,
275
      ServerTransportListener transportListener,
276
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
277
      TransportTracer transportTracer,
278
      Http2ConnectionDecoder decoder,
279
      Http2ConnectionEncoder encoder,
280
      Http2Settings settings,
281
      int maxMessageSize,
282
      long keepAliveTimeInNanos,
283
      long keepAliveTimeoutInNanos,
284
      long maxConnectionIdleInNanos,
285
      long maxConnectionAgeInNanos,
286
      long maxConnectionAgeGraceInNanos,
287
      final KeepAliveEnforcer keepAliveEnforcer,
288
      boolean autoFlowControl,
289
      Attributes eagAttributes,
290
      Ticker ticker) {
291
    super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
1✔
292
        autoFlowControl, null, ticker);
293

294
    final MaxConnectionIdleManager maxConnectionIdleManager;
295
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
296
      maxConnectionIdleManager = null;
1✔
297
    } else {
298
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
299
    }
300

301
    connection.addListener(new Http2ConnectionAdapter() {
1✔
302
      @Override
303
      public void onStreamActive(Http2Stream stream) {
304
        if (connection.numActiveStreams() == 1) {
1✔
305
          keepAliveEnforcer.onTransportActive();
1✔
306
          if (maxConnectionIdleManager != null) {
1✔
307
            maxConnectionIdleManager.onTransportActive();
1✔
308
          }
309
        }
310
      }
1✔
311

312
      @Override
313
      public void onStreamClosed(Http2Stream stream) {
314
        if (connection.numActiveStreams() == 0) {
1✔
315
          keepAliveEnforcer.onTransportIdle();
1✔
316
          if (maxConnectionIdleManager != null) {
1✔
317
            maxConnectionIdleManager.onTransportIdle();
1✔
318
          }
319
        }
320
      }
1✔
321
    });
322

323
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
324
    this.maxMessageSize = maxMessageSize;
1✔
325
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
326
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
327
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
328
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
329
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
330
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
331
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
332

333
    streamKey = encoder.connection().newKey();
1✔
334
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
335
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
336
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
337
    // Set the frame listener on the decoder.
338
    decoder().frameListener(new FrameListener());
1✔
339
  }
1✔
340

341
  @Nullable
342
  Throwable connectionError() {
343
    return connectionError;
1✔
344
  }
345

346
  @Override
347
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
348
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
349

350
    // init max connection age monitor
351
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
352
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
353
          new LogExceptionRunnable(new Runnable() {
1✔
354
            @Override
355
            public void run() {
356
              if (gracefulShutdown == null) {
1✔
357
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
358
                gracefulShutdown.start(ctx);
1✔
359
                ctx.flush();
1✔
360
              }
361
            }
1✔
362
          }),
363
          maxConnectionAgeInNanos,
364
          TimeUnit.NANOSECONDS);
365
    }
366

367
    if (maxConnectionIdleManager != null) {
1✔
368
      maxConnectionIdleManager.start(new Runnable() {
1✔
369
        @Override
370
        public void run() {
371
          if (gracefulShutdown == null) {
1✔
372
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
373
            gracefulShutdown.start(ctx);
1✔
374
            ctx.flush();
1✔
375
          }
376
        }
1✔
377
      }, ctx.executor());
1✔
378
    }
379

380
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
381
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
382
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
383
      keepAliveManager.onTransportStarted();
1✔
384
    }
385

386
    assert encoder().connection().equals(decoder().connection());
1✔
387
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
388

389
    super.handlerAdded(ctx);
1✔
390
  }
1✔
391

392
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
393
      throws Http2Exception {
394
    try {
395
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
396
      // by Netty. RFC 7540 section 8.1.2.2
397
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
398
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
399
        return;
×
400
      }
401

402
      if (headers.authority() == null) {
1✔
403
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
404
        if (hosts.size() > 1) {
1✔
405
          // RFC 7230 section 5.4
406
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
407
              "Multiple host headers");
408
          return;
1✔
409
        }
410
        if (!hosts.isEmpty()) {
1✔
411
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
412
        }
413
      }
414
      headers.remove(HOST);
1✔
415

416
      // Remove the leading slash of the path and get the fully qualified method name
417
      CharSequence path = headers.path();
1✔
418

419
      if (path == null) {
1✔
420
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
421
            "Expected path but is missing");
422
        return;
1✔
423
      }
424

425
      if (path.charAt(0) != '/') {
1✔
426
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
427
            String.format("Expected path to start with /: %s", path));
1✔
428
        return;
1✔
429
      }
430

431
      String method = path.subSequence(1, path.length()).toString();
1✔
432

433
      // Verify that the Content-Type is correct in the request.
434
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
435
      if (contentType == null) {
1✔
436
        respondWithHttpError(
×
437
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
438
        return;
×
439
      }
440
      String contentTypeString = contentType.toString();
1✔
441
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
442
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
443
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
444
        return;
1✔
445
      }
446

447
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
448
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
449
            String.format("Method '%s' is not supported", headers.method()));
1✔
450
        return;
1✔
451
      }
452

453
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
454
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
455
                + "some intermediate proxy may not support trailers",
456
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
457
        teWarningLogged = true;
1✔
458
      }
459

460
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
461
      // method.
462
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
463

464
      Metadata metadata = Utils.convertHeaders(headers);
1✔
465
      StatsTraceContext statsTraceCtx =
1✔
466
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
467

468
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
469
          this,
470
          ctx.channel().eventLoop(),
1✔
471
          http2Stream,
472
          maxMessageSize,
473
          statsTraceCtx,
474
          transportTracer,
475
          method);
476

477
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
478
        PerfMark.attachTag(state.tag());
1✔
479
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
480
        NettyServerStream stream = new NettyServerStream(
1✔
481
            ctx.channel(),
1✔
482
            state,
483
            attributes,
484
            authority,
485
            statsTraceCtx,
486
            transportTracer);
487
        transportListener.streamCreated(stream, method, metadata);
1✔
488
        state.onStreamAllocated();
1✔
489
        http2Stream.setProperty(streamKey, state);
1✔
490
      }
491
    } catch (Exception e) {
×
492
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
493
      // Throw an exception that will get handled by onStreamError.
494
      throw newStreamException(streamId, e);
×
495
    }
1✔
496
  }
1✔
497

498
  private String getOrUpdateAuthority(AsciiString authority) {
499
    if (authority == null) {
1✔
500
      return null;
1✔
501
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
502
      lastKnownAuthority = authority;
1✔
503
    }
504

505
    // AsciiString.toString() is internally cached, so subsequent calls will not
506
    // result in recomputing the String representation of lastKnownAuthority.
507
    return lastKnownAuthority.toString();
1✔
508
  }
509

510
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
511
      throws Http2Exception {
512
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
513
    try {
514
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
515
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
516
        PerfMark.attachTag(stream.tag());
1✔
517
        stream.inboundDataReceived(data, endOfStream);
1✔
518
      }
519
    } catch (Throwable e) {
×
520
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
521
      // Throw an exception that will get handled by onStreamError.
522
      throw newStreamException(streamId, e);
×
523
    }
1✔
524
  }
1✔
525

526
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
527
    try {
528
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
529
      if (stream != null) {
1✔
530
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
531
          PerfMark.attachTag(stream.tag());
1✔
532
          stream.transportReportStatus(
1✔
533
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
534
        }
535
      }
536
    } catch (Throwable e) {
×
537
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
×
538
      // Throw an exception that will get handled by onStreamError.
539
      throw newStreamException(streamId, e);
×
540
    }
1✔
541
  }
1✔
542

543
  @Override
544
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
545
      Http2Exception http2Ex) {
546
    logger.log(Level.FINE, "Connection Error", cause);
1✔
547
    connectionError = cause;
1✔
548
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
549
  }
1✔
550

551
  @Override
552
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
553
      StreamException http2Ex) {
554
    NettyServerStream.TransportState serverStream = serverStream(
1✔
555
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
556
    Level level = Level.WARNING;
1✔
557
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
558
      level = Level.FINE;
×
559
    }
560
    logger.log(level, "Stream Error", cause);
1✔
561
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
562
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
563
      PerfMark.attachTag(tag);
1✔
564
      if (serverStream != null) {
1✔
565
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
566
      }
567
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
568
      // Delegate to the base class to send a RST_STREAM.
569
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
570
    }
571
  }
1✔
572

573
  @Override
574
  public void handleProtocolNegotiationCompleted(
575
      Attributes attrs, InternalChannelz.Security securityInfo) {
576
    negotiationAttributes = attrs;
1✔
577
    this.securityInfo = securityInfo;
1✔
578
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
579
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
580
  }
1✔
581

582
  @Override
583
  public Attributes getEagAttributes() {
584
    return eagAttributes;
1✔
585
  }
586

587
  InternalChannelz.Security getSecurityInfo() {
588
    return securityInfo;
1✔
589
  }
590

591
  @VisibleForTesting
592
  KeepAliveManager getKeepAliveManagerForTest() {
593
    return keepAliveManager;
1✔
594
  }
595

596
  @VisibleForTesting
597
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
598
    this.keepAliveManager = keepAliveManager;
1✔
599
  }
1✔
600

601
  /**
602
   * Handler for the Channel shutting down.
603
   */
604
  @Override
605
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
606
    try {
607
      if (keepAliveManager != null) {
1✔
608
        keepAliveManager.onTransportTermination();
1✔
609
      }
610
      if (maxConnectionIdleManager != null) {
1✔
611
        maxConnectionIdleManager.onTransportTermination();
1✔
612
      }
613
      if (maxConnectionAgeMonitor != null) {
1✔
614
        maxConnectionAgeMonitor.cancel(false);
1✔
615
      }
616
      final Status status =
1✔
617
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
618
      // Any streams that are still active must be closed
619
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
620
        @Override
621
        public boolean visit(Http2Stream stream) throws Http2Exception {
622
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
623
          if (serverStream != null) {
1✔
624
            serverStream.transportReportStatus(status);
1✔
625
          }
626
          return true;
1✔
627
        }
628
      });
629
    } finally {
630
      super.channelInactive(ctx);
1✔
631
    }
632
  }
1✔
633

634
  WriteQueue getWriteQueue() {
635
    return serverWriteQueue;
1✔
636
  }
637

638
  /**
639
   * Handler for commands sent from the stream.
640
   */
641
  @Override
642
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
643
      throws Exception {
644
    if (msg instanceof SendGrpcFrameCommand) {
1✔
645
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
646
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
647
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
648
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
649
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
650
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
651
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
652
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
653
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
654
    } else {
655
      AssertionError e =
×
656
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
657
      ReferenceCountUtil.release(msg);
×
658
      promise.setFailure(e);
×
659
      throw e;
×
660
    }
661
  }
1✔
662

663
  @Override
664
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
665
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
666
    ctx.flush();
1✔
667
  }
1✔
668

669
  /**
670
   * Returns the given processed bytes back to inbound flow control.
671
   */
672
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
673
    try {
674
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
675
    } catch (Http2Exception e) {
×
676
      throw new RuntimeException(e);
×
677
    }
1✔
678
  }
1✔
679

680
  private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception {
681
    final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
682
    promise.addListener(new ChannelFutureListener() {
1✔
683
      @Override
684
      public void operationComplete(ChannelFuture future) {
685
        stream.complete();
1✔
686
      }
1✔
687
    });
688
  }
1✔
689

690
  /**
691
   * Sends the given gRPC frame to the client.
692
   */
693
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
694
      ChannelPromise promise) throws Http2Exception {
695
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
696
      PerfMark.attachTag(cmd.stream().tag());
1✔
697
      PerfMark.linkIn(cmd.getLink());
1✔
698
      if (cmd.endStream()) {
1✔
699
        closeStreamWhenDone(promise, cmd.stream().id());
×
700
      }
701
      // Call the base class to write the HTTP/2 DATA frame.
702
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
703
    }
704
  }
1✔
705

706
  /**
707
   * Sends the response headers to the client.
708
   */
709
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
710
      ChannelPromise promise) throws Http2Exception {
711
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
712
      PerfMark.attachTag(cmd.stream().tag());
1✔
713
      PerfMark.linkIn(cmd.getLink());
1✔
714
      // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296
715
      // is fixed.
716
      int streamId = cmd.stream().id();
1✔
717
      Http2Stream stream = connection().stream(streamId);
1✔
718
      if (stream == null) {
1✔
719
        resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
1✔
720
        return;
1✔
721
      }
722
      if (cmd.endOfStream()) {
1✔
723
        closeStreamWhenDone(promise, streamId);
1✔
724
      }
725
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
726
    }
727
  }
1✔
728

729
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
730
      ChannelPromise promise) {
731
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
732
      PerfMark.attachTag(cmd.stream().tag());
1✔
733
      PerfMark.linkIn(cmd.getLink());
1✔
734
      // Notify the listener if we haven't already.
735
      cmd.stream().transportReportStatus(cmd.reason());
1✔
736
      // Terminate the stream.
737
      encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
738
    }
739
  }
1✔
740

741
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
742
      ChannelPromise promise) throws Exception {
743
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
744
    // requested here. But that's an edge case and seems bug-prone.
745
    if (gracefulShutdown == null) {
1✔
746
      Long graceTimeInNanos = null;
1✔
747
      if (msg.getGraceTimeUnit() != null) {
1✔
748
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
749
      }
750
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
751
      gracefulShutdown.start(ctx);
1✔
752
    }
753
    promise.setSuccess();
1✔
754
  }
1✔
755

756
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
757
      ChannelPromise promise) throws Exception {
758
    super.close(ctx, promise);
1✔
759
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
760
      @Override
761
      public boolean visit(Http2Stream stream) throws Http2Exception {
762
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
763
        if (serverStream != null) {
1✔
764
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
765
            PerfMark.attachTag(serverStream.tag());
1✔
766
            PerfMark.linkIn(msg.getLink());
1✔
767
            serverStream.transportReportStatus(msg.getStatus());
1✔
768
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
769
          }
770
        }
771
        stream.close();
1✔
772
        return true;
1✔
773
      }
774
    });
775
  }
1✔
776

777
  private void respondWithHttpError(
778
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
779
    Metadata metadata = new Metadata();
1✔
780
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
781
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
782
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
783

784
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
785
        .status("" + code)
1✔
786
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
787
    for (int i = 0; i < serialized.length; i += 2) {
1✔
788
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
789
    }
790
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
791
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
792
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
793
  }
1✔
794

795
  private Http2Stream requireHttp2Stream(int streamId) {
796
    Http2Stream stream = connection().stream(streamId);
1✔
797
    if (stream == null) {
1✔
798
      // This should never happen.
799
      throw new AssertionError("Stream does not exist: " + streamId);
×
800
    }
801
    return stream;
1✔
802
  }
803

804
  /**
805
   * Returns the server stream associated to the given HTTP/2 stream object.
806
   */
807
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
808
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
809
  }
810

811
  private Http2Exception newStreamException(int streamId, Throwable cause) {
812
    return Http2Exception.streamError(
×
813
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
×
814
  }
815

816
  private class FrameListener extends Http2FrameAdapter {
1✔
817
    private boolean firstSettings = true;
1✔
818

819
    @Override
820
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
821
      if (firstSettings) {
1✔
822
        firstSettings = false;
1✔
823
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
824
        // handshakeTimeout
825
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
826
      }
827
    }
1✔
828

829
    @Override
830
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
831
        boolean endOfStream) throws Http2Exception {
832
      if (keepAliveManager != null) {
1✔
833
        keepAliveManager.onDataReceived();
1✔
834
      }
835
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
836
      return padding;
1✔
837
    }
838

839
    @Override
840
    public void onHeadersRead(ChannelHandlerContext ctx,
841
        int streamId,
842
        Http2Headers headers,
843
        int streamDependency,
844
        short weight,
845
        boolean exclusive,
846
        int padding,
847
        boolean endStream) throws Http2Exception {
848
      if (keepAliveManager != null) {
1✔
849
        keepAliveManager.onDataReceived();
1✔
850
      }
851
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
852
      if (endStream) {
1✔
853
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
854
      }
855
    }
1✔
856

857
    @Override
858
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
859
        throws Http2Exception {
860
      if (keepAliveManager != null) {
1✔
861
        keepAliveManager.onDataReceived();
1✔
862
      }
863
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
864
    }
1✔
865

866
    @Override
867
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
868
      if (keepAliveManager != null) {
1✔
869
        keepAliveManager.onDataReceived();
1✔
870
      }
871
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
872
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
873
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
874
            debugData, ctx.newPromise());
1✔
875
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
876
        try {
877
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
878
        } catch (Exception ex) {
×
879
          onError(ctx, /* outbound= */ true, ex);
×
880
        }
1✔
881
      }
882
    }
1✔
883

884
    @Override
885
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
886
      if (keepAliveManager != null) {
1✔
887
        keepAliveManager.onDataReceived();
1✔
888
      }
889
      if (data == flowControlPing().payload()) {
1✔
890
        flowControlPing().updateWindow();
1✔
891
        logger.log(Level.FINE, "Window: {0}",
1✔
892
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
893
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
894
        if (gracefulShutdown == null) {
1✔
895
          // this should never happen
896
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
897
        } else {
898
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
899
        }
900
      } else if (data != KEEPALIVE_PING) {
1✔
901
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
902
      }
903
    }
1✔
904
  }
905

906
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
907
    final ChannelHandlerContext ctx;
908

909
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
910
      this.ctx = ctx;
1✔
911
    }
1✔
912

913
    @Override
914
    public void ping() {
915
      ChannelFuture pingFuture = encoder().writePing(
1✔
916
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
917
      ctx.flush();
1✔
918
      pingFuture.addListener(new ChannelFutureListener() {
1✔
919
        @Override
920
        public void operationComplete(ChannelFuture future) throws Exception {
921
          if (future.isSuccess()) {
1✔
922
            transportTracer.reportKeepAliveSent();
1✔
923
          }
924
        }
1✔
925
      });
926
    }
1✔
927

928
    @Override
929
    public void onPingTimeout() {
930
      try {
931
        forcefulClose(
1✔
932
            ctx,
933
            new ForcefulCloseCommand(Status.UNAVAILABLE
934
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
935
            ctx.newPromise());
1✔
936
      } catch (Exception ex) {
×
937
        try {
938
          exceptionCaught(ctx, ex);
×
939
        } catch (Exception ex2) {
×
940
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
941
          logger.log(Level.WARNING, "Original failure", ex);
×
942
        }
×
943
      }
1✔
944
    }
1✔
945
  }
946

947
  private final class GracefulShutdown {
948
    String goAwayMessage;
949

950
    /**
951
     * The grace time between starting graceful shutdown and closing the netty channel,
952
     * {@code null} is unspecified.
953
     */
954
    @CheckForNull
955
    Long graceTimeInNanos;
956

957
    /**
958
     * True if ping is Acked or ping is timeout.
959
     */
960
    boolean pingAckedOrTimeout;
961

962
    Future<?> pingFuture;
963

964
    GracefulShutdown(String goAwayMessage,
965
        @Nullable Long graceTimeInNanos) {
1✔
966
      this.goAwayMessage = goAwayMessage;
1✔
967
      this.graceTimeInNanos = graceTimeInNanos;
1✔
968
    }
1✔
969

970
    /**
971
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
972
     */
973
    void start(final ChannelHandlerContext ctx) {
974
      goAway(
1✔
975
          ctx,
976
          Integer.MAX_VALUE,
977
          Http2Error.NO_ERROR.code(),
1✔
978
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
979
          ctx.newPromise());
1✔
980

981
      pingFuture = ctx.executor().schedule(
1✔
982
          new Runnable() {
1✔
983
            @Override
984
            public void run() {
985
              secondGoAwayAndClose(ctx);
1✔
986
            }
1✔
987
          },
988
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
989
          TimeUnit.NANOSECONDS);
990

991
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
992
    }
1✔
993

994
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
995
      if (pingAckedOrTimeout) {
1✔
996
        return;
×
997
      }
998
      pingAckedOrTimeout = true;
1✔
999

1000
      checkNotNull(pingFuture, "pingFuture");
1✔
1001
      pingFuture.cancel(false);
1✔
1002

1003
      // send the second GOAWAY with last stream id
1004
      goAway(
1✔
1005
          ctx,
1006
          connection().remote().lastStreamCreated(),
1✔
1007
          Http2Error.NO_ERROR.code(),
1✔
1008
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1009
          ctx.newPromise());
1✔
1010

1011
      // gracefully shutdown with specified grace time
1012
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1013
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1014
      try {
1015
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1016
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1017
      } catch (Exception e) {
×
1018
        onError(ctx, /* outbound= */ true, e);
×
1019
      } finally {
1020
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1021
      }
1022
    }
1✔
1023

1024
    private long graceTimeOverrideMillis(long originalMillis) {
1025
      if (graceTimeInNanos == null) {
1✔
1026
        return originalMillis;
1✔
1027
      }
1028
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1029
        // netty treats -1 as "no timeout"
1030
        return -1L;
1✔
1031
      }
1032
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1033
    }
1034
  }
1035

1036
  // Use a frame writer so that we know when frames are through flow control and actually being
1037
  // written.
1038
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1039
    private final KeepAliveEnforcer keepAliveEnforcer;
1040

1041
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1042
        KeepAliveEnforcer keepAliveEnforcer) {
1043
      super(delegate);
1✔
1044
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1045
    }
1✔
1046

1047
    @Override
1048
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1049
        int padding, boolean endStream, ChannelPromise promise) {
1050
      keepAliveEnforcer.resetCounters();
1✔
1051
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1052
    }
1053

1054
    @Override
1055
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1056
        int padding, boolean endStream, ChannelPromise promise) {
1057
      keepAliveEnforcer.resetCounters();
1✔
1058
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1059
    }
1060

1061
    @Override
1062
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1063
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1064
        ChannelPromise promise) {
1065
      keepAliveEnforcer.resetCounters();
×
1066
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1067
          padding, endStream, promise);
1068
    }
1069
  }
1070

1071
  private static class ServerChannelLogger extends ChannelLogger {
1072
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1073

1074
    @Override
1075
    public void log(ChannelLogLevel level, String message) {
1076
      log.log(toJavaLogLevel(level), message);
1✔
1077
    }
1✔
1078

1079
    @Override
1080
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1081
      log(level, MessageFormat.format(messageFormat, args));
1✔
1082
    }
1✔
1083
  }
1084

1085
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1086
    switch (level) {
1✔
1087
      case ERROR:
1088
        return Level.FINE;
×
1089
      case WARNING:
1090
        return Level.FINER;
×
1091
      default:
1092
        return Level.FINEST;
1✔
1093
    }
1094
  }
1095
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc