• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19157

08 Apr 2024 06:30PM CUT coverage: 88.312%. Remained the same
#19157

push

github

web-flow
buildscripts: Migrate PSM Interop to Artifact Registry (#11079) (#11092)

From Container Registry (gcr.io) to Artifact Registry (pkg.dev).

30337 of 34352 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.3
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
64
import io.netty.handler.codec.http2.DefaultHttp2Connection;
65
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
66
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
67
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
68
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
69
import io.netty.handler.codec.http2.DefaultHttp2Headers;
70
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
71
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
72
import io.netty.handler.codec.http2.Http2Connection;
73
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
74
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
75
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
76
import io.netty.handler.codec.http2.Http2Error;
77
import io.netty.handler.codec.http2.Http2Exception;
78
import io.netty.handler.codec.http2.Http2Exception.StreamException;
79
import io.netty.handler.codec.http2.Http2FrameAdapter;
80
import io.netty.handler.codec.http2.Http2FrameLogger;
81
import io.netty.handler.codec.http2.Http2FrameReader;
82
import io.netty.handler.codec.http2.Http2FrameWriter;
83
import io.netty.handler.codec.http2.Http2Headers;
84
import io.netty.handler.codec.http2.Http2HeadersDecoder;
85
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
86
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
87
import io.netty.handler.codec.http2.Http2Settings;
88
import io.netty.handler.codec.http2.Http2Stream;
89
import io.netty.handler.codec.http2.Http2StreamVisitor;
90
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
91
import io.netty.handler.logging.LogLevel;
92
import io.netty.util.AsciiString;
93
import io.netty.util.ReferenceCountUtil;
94
import io.perfmark.PerfMark;
95
import io.perfmark.Tag;
96
import io.perfmark.TaskCloseable;
97
import java.text.MessageFormat;
98
import java.util.List;
99
import java.util.concurrent.Future;
100
import java.util.concurrent.ScheduledFuture;
101
import java.util.concurrent.TimeUnit;
102
import java.util.logging.Level;
103
import java.util.logging.Logger;
104
import javax.annotation.CheckForNull;
105
import javax.annotation.Nullable;
106

107
/**
108
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
109
 * the context of the Netty Channel thread.
110
 */
111
class NettyServerHandler extends AbstractNettyHandler {
112
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
113
  private static final long KEEPALIVE_PING = 0xDEADL;
114
  @VisibleForTesting
115
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
116
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
117
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
118
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
119
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
120

121
  private final Http2Connection.PropertyKey streamKey;
122
  private final ServerTransportListener transportListener;
123
  private final int maxMessageSize;
124
  private final long keepAliveTimeInNanos;
125
  private final long keepAliveTimeoutInNanos;
126
  private final long maxConnectionAgeInNanos;
127
  private final long maxConnectionAgeGraceInNanos;
128
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
129
  private final TransportTracer transportTracer;
130
  private final KeepAliveEnforcer keepAliveEnforcer;
131
  private final Attributes eagAttributes;
132
  /** Incomplete attributes produced by negotiator. */
133
  private Attributes negotiationAttributes;
134
  private InternalChannelz.Security securityInfo;
135
  /** Completed attributes produced by transportReady. */
136
  private Attributes attributes;
137
  private Throwable connectionError;
138
  private boolean teWarningLogged;
139
  private WriteQueue serverWriteQueue;
140
  private AsciiString lastKnownAuthority;
141
  @CheckForNull
142
  private KeepAliveManager keepAliveManager;
143
  @CheckForNull
144
  private MaxConnectionIdleManager maxConnectionIdleManager;
145
  @CheckForNull
146
  private ScheduledFuture<?> maxConnectionAgeMonitor;
147
  @CheckForNull
148
  private GracefulShutdown gracefulShutdown;
149

150
  static NettyServerHandler newHandler(
151
      ServerTransportListener transportListener,
152
      ChannelPromise channelUnused,
153
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
154
      TransportTracer transportTracer,
155
      int maxStreams,
156
      boolean autoFlowControl,
157
      int flowControlWindow,
158
      int maxHeaderListSize,
159
      int maxMessageSize,
160
      long keepAliveTimeInNanos,
161
      long keepAliveTimeoutInNanos,
162
      long maxConnectionIdleInNanos,
163
      long maxConnectionAgeInNanos,
164
      long maxConnectionAgeGraceInNanos,
165
      boolean permitKeepAliveWithoutCalls,
166
      long permitKeepAliveTimeInNanos,
167
      Attributes eagAttributes) {
168
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
169
        maxHeaderListSize);
170
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
171
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
172
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
173
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
174
    Http2FrameWriter frameWriter =
1✔
175
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
176
    return newHandler(
1✔
177
        channelUnused,
178
        frameReader,
179
        frameWriter,
180
        transportListener,
181
        streamTracerFactories,
182
        transportTracer,
183
        maxStreams,
184
        autoFlowControl,
185
        flowControlWindow,
186
        maxHeaderListSize,
187
        maxMessageSize,
188
        keepAliveTimeInNanos,
189
        keepAliveTimeoutInNanos,
190
        maxConnectionIdleInNanos,
191
        maxConnectionAgeInNanos,
192
        maxConnectionAgeGraceInNanos,
193
        permitKeepAliveWithoutCalls,
194
        permitKeepAliveTimeInNanos,
195
        eagAttributes,
196
        Ticker.systemTicker());
1✔
197
  }
198

199
  static NettyServerHandler newHandler(
200
      ChannelPromise channelUnused,
201
      Http2FrameReader frameReader,
202
      Http2FrameWriter frameWriter,
203
      ServerTransportListener transportListener,
204
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
205
      TransportTracer transportTracer,
206
      int maxStreams,
207
      boolean autoFlowControl,
208
      int flowControlWindow,
209
      int maxHeaderListSize,
210
      int maxMessageSize,
211
      long keepAliveTimeInNanos,
212
      long keepAliveTimeoutInNanos,
213
      long maxConnectionIdleInNanos,
214
      long maxConnectionAgeInNanos,
215
      long maxConnectionAgeGraceInNanos,
216
      boolean permitKeepAliveWithoutCalls,
217
      long permitKeepAliveTimeInNanos,
218
      Attributes eagAttributes,
219
      Ticker ticker) {
220
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
221
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
222
        flowControlWindow);
223
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
224
        maxHeaderListSize);
225
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
226
        maxMessageSize);
227

228
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
229
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
230
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
231
    DefaultHttp2RemoteFlowController controller =
1✔
232
        new DefaultHttp2RemoteFlowController(connection, dist);
233
    connection.remote().flowController(controller);
1✔
234
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
235
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
236

237
    // Create the local flow controller configured to auto-refill the connection window.
238
    connection.local().flowController(
1✔
239
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
240
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
241
    Http2ConnectionEncoder encoder =
1✔
242
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
243
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
244
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
245
        frameReader);
246

247
    Http2Settings settings = new Http2Settings();
1✔
248
    settings.initialWindowSize(flowControlWindow);
1✔
249
    settings.maxConcurrentStreams(maxStreams);
1✔
250
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
251

252
    if (ticker == null) {
1✔
253
      ticker = Ticker.systemTicker();
×
254
    }
255

256
    return new NettyServerHandler(
1✔
257
        channelUnused,
258
        connection,
259
        transportListener,
260
        streamTracerFactories,
261
        transportTracer,
262
        decoder, encoder, settings,
263
        maxMessageSize,
264
        keepAliveTimeInNanos, keepAliveTimeoutInNanos,
265
        maxConnectionIdleInNanos,
266
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
267
        keepAliveEnforcer,
268
        autoFlowControl,
269
        eagAttributes, ticker);
270
  }
271

272
  private NettyServerHandler(
273
      ChannelPromise channelUnused,
274
      final Http2Connection connection,
275
      ServerTransportListener transportListener,
276
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
277
      TransportTracer transportTracer,
278
      Http2ConnectionDecoder decoder,
279
      Http2ConnectionEncoder encoder,
280
      Http2Settings settings,
281
      int maxMessageSize,
282
      long keepAliveTimeInNanos,
283
      long keepAliveTimeoutInNanos,
284
      long maxConnectionIdleInNanos,
285
      long maxConnectionAgeInNanos,
286
      long maxConnectionAgeGraceInNanos,
287
      final KeepAliveEnforcer keepAliveEnforcer,
288
      boolean autoFlowControl,
289
      Attributes eagAttributes,
290
      Ticker ticker) {
291
    super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
1✔
292
        autoFlowControl, null, ticker);
293

294
    final MaxConnectionIdleManager maxConnectionIdleManager;
295
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
296
      maxConnectionIdleManager = null;
1✔
297
    } else {
298
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
299
    }
300

301
    connection.addListener(new Http2ConnectionAdapter() {
1✔
302
      @Override
303
      public void onStreamActive(Http2Stream stream) {
304
        if (connection.numActiveStreams() == 1) {
1✔
305
          keepAliveEnforcer.onTransportActive();
1✔
306
          if (maxConnectionIdleManager != null) {
1✔
307
            maxConnectionIdleManager.onTransportActive();
1✔
308
          }
309
        }
310
      }
1✔
311

312
      @Override
313
      public void onStreamClosed(Http2Stream stream) {
314
        if (connection.numActiveStreams() == 0) {
1✔
315
          keepAliveEnforcer.onTransportIdle();
1✔
316
          if (maxConnectionIdleManager != null) {
1✔
317
            maxConnectionIdleManager.onTransportIdle();
1✔
318
          }
319
        }
320
      }
1✔
321
    });
322

323
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
324
    this.maxMessageSize = maxMessageSize;
1✔
325
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
326
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
327
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
328
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
329
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
330
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
331
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
332

333
    streamKey = encoder.connection().newKey();
1✔
334
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
335
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
336
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
337
    // Set the frame listener on the decoder.
338
    decoder().frameListener(new FrameListener());
1✔
339
  }
1✔
340

341
  @Nullable
342
  Throwable connectionError() {
343
    return connectionError;
1✔
344
  }
345

346
  @Override
347
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
348
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
349

350
    // init max connection age monitor
351
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
352
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
353
          new LogExceptionRunnable(new Runnable() {
1✔
354
            @Override
355
            public void run() {
356
              if (gracefulShutdown == null) {
1✔
357
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
358
                gracefulShutdown.start(ctx);
1✔
359
                ctx.flush();
1✔
360
              }
361
            }
1✔
362
          }),
363
          maxConnectionAgeInNanos,
364
          TimeUnit.NANOSECONDS);
365
    }
366

367
    if (maxConnectionIdleManager != null) {
1✔
368
      maxConnectionIdleManager.start(new Runnable() {
1✔
369
        @Override
370
        public void run() {
371
          if (gracefulShutdown == null) {
1✔
372
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
373
            gracefulShutdown.start(ctx);
1✔
374
            ctx.flush();
1✔
375
          }
376
        }
1✔
377
      }, ctx.executor());
1✔
378
    }
379

380
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
381
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
382
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
383
      keepAliveManager.onTransportStarted();
1✔
384
    }
385

386
    assert encoder().connection().equals(decoder().connection());
1✔
387
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
388

389
    super.handlerAdded(ctx);
1✔
390
  }
1✔
391

392
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
393
      throws Http2Exception {
394
    try {
395
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
396
      // by Netty. RFC 7540 section 8.1.2.2
397
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
398
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
399
        return;
×
400
      }
401

402
      if (headers.authority() == null) {
1✔
403
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
404
        if (hosts.size() > 1) {
1✔
405
          // RFC 7230 section 5.4
406
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
407
              "Multiple host headers");
408
          return;
1✔
409
        }
410
        if (!hosts.isEmpty()) {
1✔
411
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
412
        }
413
      }
414
      headers.remove(HOST);
1✔
415

416
      // Remove the leading slash of the path and get the fully qualified method name
417
      CharSequence path = headers.path();
1✔
418

419
      if (path == null) {
1✔
420
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
421
            "Expected path but is missing");
422
        return;
1✔
423
      }
424

425
      if (path.charAt(0) != '/') {
1✔
426
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
427
            String.format("Expected path to start with /: %s", path));
1✔
428
        return;
1✔
429
      }
430

431
      String method = path.subSequence(1, path.length()).toString();
1✔
432

433
      // Verify that the Content-Type is correct in the request.
434
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
435
      if (contentType == null) {
1✔
436
        respondWithHttpError(
1✔
437
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
438
        return;
1✔
439
      }
440
      String contentTypeString = contentType.toString();
1✔
441
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
442
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
443
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
444
        return;
1✔
445
      }
446

447
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
448
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
449
            String.format("Method '%s' is not supported", headers.method()));
1✔
450
        return;
1✔
451
      }
452

453
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
454
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
455
                + "some intermediate proxy may not support trailers",
456
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
457
        teWarningLogged = true;
1✔
458
      }
459

460
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
461
      // method.
462
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
463

464
      Metadata metadata = Utils.convertHeaders(headers);
1✔
465
      StatsTraceContext statsTraceCtx =
1✔
466
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
467

468
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
469
          this,
470
          ctx.channel().eventLoop(),
1✔
471
          http2Stream,
472
          maxMessageSize,
473
          statsTraceCtx,
474
          transportTracer,
475
          method);
476

477
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
478
        PerfMark.attachTag(state.tag());
1✔
479
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
480
        NettyServerStream stream = new NettyServerStream(
1✔
481
            ctx.channel(),
1✔
482
            state,
483
            attributes,
484
            authority,
485
            statsTraceCtx,
486
            transportTracer);
487
        transportListener.streamCreated(stream, method, metadata);
1✔
488
        state.onStreamAllocated();
1✔
489
        http2Stream.setProperty(streamKey, state);
1✔
490
      }
491
    } catch (Exception e) {
×
492
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
493
      // Throw an exception that will get handled by onStreamError.
494
      throw newStreamException(streamId, e);
×
495
    }
1✔
496
  }
1✔
497

498
  private String getOrUpdateAuthority(AsciiString authority) {
499
    if (authority == null) {
1✔
500
      return null;
1✔
501
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
502
      lastKnownAuthority = authority;
1✔
503
    }
504

505
    // AsciiString.toString() is internally cached, so subsequent calls will not
506
    // result in recomputing the String representation of lastKnownAuthority.
507
    return lastKnownAuthority.toString();
1✔
508
  }
509

510
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
511
      throws Http2Exception {
512
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
513
    try {
514
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
515
      if (stream == null) {
1✔
516
        return;
1✔
517
      }
518
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
519
        PerfMark.attachTag(stream.tag());
1✔
520
        stream.inboundDataReceived(data, endOfStream);
1✔
521
      }
522
    } catch (Throwable e) {
×
523
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
524
      // Throw an exception that will get handled by onStreamError.
525
      throw newStreamException(streamId, e);
×
526
    }
1✔
527
  }
1✔
528

529
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
530
    try {
531
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
532
      if (stream != null) {
1✔
533
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
534
          PerfMark.attachTag(stream.tag());
1✔
535
          stream.transportReportStatus(
1✔
536
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
537
        }
538
      }
539
    } catch (Throwable e) {
×
540
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
×
541
      // Throw an exception that will get handled by onStreamError.
542
      throw newStreamException(streamId, e);
×
543
    }
1✔
544
  }
1✔
545

546
  @Override
547
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
548
      Http2Exception http2Ex) {
549
    logger.log(Level.FINE, "Connection Error", cause);
1✔
550
    connectionError = cause;
1✔
551
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
552
  }
1✔
553

554
  @Override
555
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
556
      StreamException http2Ex) {
557
    NettyServerStream.TransportState serverStream = serverStream(
1✔
558
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
559
    Level level = Level.WARNING;
1✔
560
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
561
      level = Level.FINE;
×
562
    }
563
    logger.log(level, "Stream Error", cause);
1✔
564
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
565
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
566
      PerfMark.attachTag(tag);
1✔
567
      if (serverStream != null) {
1✔
568
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
569
      }
570
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
571
      // Delegate to the base class to send a RST_STREAM.
572
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
573
    }
574
  }
1✔
575

576
  @Override
577
  public void handleProtocolNegotiationCompleted(
578
      Attributes attrs, InternalChannelz.Security securityInfo) {
579
    negotiationAttributes = attrs;
1✔
580
    this.securityInfo = securityInfo;
1✔
581
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
582
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
583
  }
1✔
584

585
  @Override
586
  public Attributes getEagAttributes() {
587
    return eagAttributes;
1✔
588
  }
589

590
  InternalChannelz.Security getSecurityInfo() {
591
    return securityInfo;
1✔
592
  }
593

594
  @VisibleForTesting
595
  KeepAliveManager getKeepAliveManagerForTest() {
596
    return keepAliveManager;
1✔
597
  }
598

599
  @VisibleForTesting
600
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
601
    this.keepAliveManager = keepAliveManager;
1✔
602
  }
1✔
603

604
  /**
605
   * Handler for the Channel shutting down.
606
   */
607
  @Override
608
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
609
    try {
610
      if (keepAliveManager != null) {
1✔
611
        keepAliveManager.onTransportTermination();
1✔
612
      }
613
      if (maxConnectionIdleManager != null) {
1✔
614
        maxConnectionIdleManager.onTransportTermination();
1✔
615
      }
616
      if (maxConnectionAgeMonitor != null) {
1✔
617
        maxConnectionAgeMonitor.cancel(false);
1✔
618
      }
619
      final Status status =
1✔
620
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
621
      // Any streams that are still active must be closed
622
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
623
        @Override
624
        public boolean visit(Http2Stream stream) throws Http2Exception {
625
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
626
          if (serverStream != null) {
1✔
627
            serverStream.transportReportStatus(status);
1✔
628
          }
629
          return true;
1✔
630
        }
631
      });
632
    } finally {
633
      super.channelInactive(ctx);
1✔
634
    }
635
  }
1✔
636

637
  WriteQueue getWriteQueue() {
638
    return serverWriteQueue;
1✔
639
  }
640

641
  /**
642
   * Handler for commands sent from the stream.
643
   */
644
  @Override
645
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
646
      throws Exception {
647
    if (msg instanceof SendGrpcFrameCommand) {
1✔
648
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
649
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
650
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
651
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
652
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
653
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
654
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
655
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
656
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
657
    } else {
658
      AssertionError e =
×
659
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
660
      ReferenceCountUtil.release(msg);
×
661
      promise.setFailure(e);
×
662
      throw e;
×
663
    }
664
  }
1✔
665

666
  @Override
667
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
668
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
669
    ctx.flush();
1✔
670
  }
1✔
671

672
  /**
673
   * Returns the given processed bytes back to inbound flow control.
674
   */
675
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
676
    try {
677
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
678
    } catch (Http2Exception e) {
×
679
      throw new RuntimeException(e);
×
680
    }
1✔
681
  }
1✔
682

683
  private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception {
684
    final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
685
    if (stream != null) {
1✔
686
      promise.addListener(new ChannelFutureListener() {
1✔
687
        @Override
688
        public void operationComplete(ChannelFuture future) {
689
          stream.complete();
1✔
690
        }
1✔
691
      });
692
    }
693
  }
1✔
694

695
  /**
696
   * Sends the given gRPC frame to the client.
697
   */
698
  private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
699
      ChannelPromise promise) throws Http2Exception {
700
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
701
      PerfMark.attachTag(cmd.stream().tag());
1✔
702
      PerfMark.linkIn(cmd.getLink());
1✔
703
      if (cmd.endStream()) {
1✔
704
        closeStreamWhenDone(promise, cmd.stream().id());
×
705
      }
706
      // Call the base class to write the HTTP/2 DATA frame.
707
      encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
1✔
708
    }
709
  }
1✔
710

711
  /**
712
   * Sends the response headers to the client.
713
   */
714
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
715
      ChannelPromise promise) throws Http2Exception {
716
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
717
      PerfMark.attachTag(cmd.stream().tag());
1✔
718
      PerfMark.linkIn(cmd.getLink());
1✔
719
      // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296
720
      // is fixed.
721
      int streamId = cmd.stream().id();
1✔
722
      Http2Stream stream = connection().stream(streamId);
1✔
723
      if (stream == null) {
1✔
724
        resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
1✔
725
        return;
1✔
726
      }
727
      if (cmd.endOfStream()) {
1✔
728
        closeStreamWhenDone(promise, streamId);
1✔
729
      }
730
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
731
    }
732
  }
1✔
733

734
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
735
      ChannelPromise promise) {
736
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
737
      PerfMark.attachTag(cmd.stream().tag());
1✔
738
      PerfMark.linkIn(cmd.getLink());
1✔
739
      // Notify the listener if we haven't already.
740
      cmd.stream().transportReportStatus(cmd.reason());
1✔
741
      // Terminate the stream.
742
      encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
743
    }
744
  }
1✔
745

746
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
747
      ChannelPromise promise) throws Exception {
748
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
749
    // requested here. But that's an edge case and seems bug-prone.
750
    if (gracefulShutdown == null) {
1✔
751
      Long graceTimeInNanos = null;
1✔
752
      if (msg.getGraceTimeUnit() != null) {
1✔
753
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
754
      }
755
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
756
      gracefulShutdown.start(ctx);
1✔
757
    }
758
    promise.setSuccess();
1✔
759
  }
1✔
760

761
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
762
      ChannelPromise promise) throws Exception {
763
    super.close(ctx, promise);
1✔
764
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
765
      @Override
766
      public boolean visit(Http2Stream stream) throws Http2Exception {
767
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
768
        if (serverStream != null) {
1✔
769
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
770
            PerfMark.attachTag(serverStream.tag());
1✔
771
            PerfMark.linkIn(msg.getLink());
1✔
772
            serverStream.transportReportStatus(msg.getStatus());
1✔
773
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
774
          }
775
        }
776
        stream.close();
1✔
777
        return true;
1✔
778
      }
779
    });
780
  }
1✔
781

782
  private void respondWithHttpError(
783
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
784
    Metadata metadata = new Metadata();
1✔
785
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
786
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
787
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
788

789
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
790
        .status("" + code)
1✔
791
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
792
    for (int i = 0; i < serialized.length; i += 2) {
1✔
793
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
794
    }
795
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
796
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
797
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
798
  }
1✔
799

800
  private Http2Stream requireHttp2Stream(int streamId) {
801
    Http2Stream stream = connection().stream(streamId);
1✔
802
    if (stream == null) {
1✔
803
      // This should never happen.
804
      throw new AssertionError("Stream does not exist: " + streamId);
×
805
    }
806
    return stream;
1✔
807
  }
808

809
  /**
810
   * Returns the server stream associated to the given HTTP/2 stream object.
811
   */
812
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
813
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
814
  }
815

816
  private Http2Exception newStreamException(int streamId, Throwable cause) {
817
    return Http2Exception.streamError(
×
818
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
×
819
  }
820

821
  private class FrameListener extends Http2FrameAdapter {
1✔
822
    private boolean firstSettings = true;
1✔
823

824
    @Override
825
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
826
      if (firstSettings) {
1✔
827
        firstSettings = false;
1✔
828
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
829
        // handshakeTimeout
830
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
831
      }
832
    }
1✔
833

834
    @Override
835
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
836
        boolean endOfStream) throws Http2Exception {
837
      if (keepAliveManager != null) {
1✔
838
        keepAliveManager.onDataReceived();
1✔
839
      }
840
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
841
      return padding;
1✔
842
    }
843

844
    @Override
845
    public void onHeadersRead(ChannelHandlerContext ctx,
846
        int streamId,
847
        Http2Headers headers,
848
        int streamDependency,
849
        short weight,
850
        boolean exclusive,
851
        int padding,
852
        boolean endStream) throws Http2Exception {
853
      if (keepAliveManager != null) {
1✔
854
        keepAliveManager.onDataReceived();
1✔
855
      }
856
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
857
      if (endStream) {
1✔
858
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
859
      }
860
    }
1✔
861

862
    @Override
863
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
864
        throws Http2Exception {
865
      if (keepAliveManager != null) {
1✔
866
        keepAliveManager.onDataReceived();
1✔
867
      }
868
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
869
    }
1✔
870

871
    @Override
872
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
873
      if (keepAliveManager != null) {
1✔
874
        keepAliveManager.onDataReceived();
1✔
875
      }
876
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
877
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
878
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
879
            debugData, ctx.newPromise());
1✔
880
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
881
        try {
882
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
883
        } catch (Exception ex) {
×
884
          onError(ctx, /* outbound= */ true, ex);
×
885
        }
1✔
886
      }
887
    }
1✔
888

889
    @Override
890
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
891
      if (keepAliveManager != null) {
1✔
892
        keepAliveManager.onDataReceived();
1✔
893
      }
894
      if (data == flowControlPing().payload()) {
1✔
895
        flowControlPing().updateWindow();
1✔
896
        logger.log(Level.FINE, "Window: {0}",
1✔
897
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
898
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
899
        if (gracefulShutdown == null) {
1✔
900
          // this should never happen
901
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
902
        } else {
903
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
904
        }
905
      } else if (data != KEEPALIVE_PING) {
1✔
906
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
907
      }
908
    }
1✔
909
  }
910

911
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
912
    final ChannelHandlerContext ctx;
913

914
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
915
      this.ctx = ctx;
1✔
916
    }
1✔
917

918
    @Override
919
    public void ping() {
920
      ChannelFuture pingFuture = encoder().writePing(
1✔
921
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
922
      ctx.flush();
1✔
923
      pingFuture.addListener(new ChannelFutureListener() {
1✔
924
        @Override
925
        public void operationComplete(ChannelFuture future) throws Exception {
926
          if (future.isSuccess()) {
1✔
927
            transportTracer.reportKeepAliveSent();
1✔
928
          }
929
        }
1✔
930
      });
931
    }
1✔
932

933
    @Override
934
    public void onPingTimeout() {
935
      try {
936
        forcefulClose(
1✔
937
            ctx,
938
            new ForcefulCloseCommand(Status.UNAVAILABLE
939
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
940
            ctx.newPromise());
1✔
941
      } catch (Exception ex) {
×
942
        try {
943
          exceptionCaught(ctx, ex);
×
944
        } catch (Exception ex2) {
×
945
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
946
          logger.log(Level.WARNING, "Original failure", ex);
×
947
        }
×
948
      }
1✔
949
    }
1✔
950
  }
951

952
  private final class GracefulShutdown {
953
    String goAwayMessage;
954

955
    /**
956
     * The grace time between starting graceful shutdown and closing the netty channel,
957
     * {@code null} is unspecified.
958
     */
959
    @CheckForNull
960
    Long graceTimeInNanos;
961

962
    /**
963
     * True if ping is Acked or ping is timeout.
964
     */
965
    boolean pingAckedOrTimeout;
966

967
    Future<?> pingFuture;
968

969
    GracefulShutdown(String goAwayMessage,
970
        @Nullable Long graceTimeInNanos) {
1✔
971
      this.goAwayMessage = goAwayMessage;
1✔
972
      this.graceTimeInNanos = graceTimeInNanos;
1✔
973
    }
1✔
974

975
    /**
976
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
977
     */
978
    void start(final ChannelHandlerContext ctx) {
979
      goAway(
1✔
980
          ctx,
981
          Integer.MAX_VALUE,
982
          Http2Error.NO_ERROR.code(),
1✔
983
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
984
          ctx.newPromise());
1✔
985

986
      pingFuture = ctx.executor().schedule(
1✔
987
          new Runnable() {
1✔
988
            @Override
989
            public void run() {
990
              secondGoAwayAndClose(ctx);
1✔
991
            }
1✔
992
          },
993
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
994
          TimeUnit.NANOSECONDS);
995

996
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
997
    }
1✔
998

999
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1000
      if (pingAckedOrTimeout) {
1✔
1001
        return;
×
1002
      }
1003
      pingAckedOrTimeout = true;
1✔
1004

1005
      checkNotNull(pingFuture, "pingFuture");
1✔
1006
      pingFuture.cancel(false);
1✔
1007

1008
      // send the second GOAWAY with last stream id
1009
      goAway(
1✔
1010
          ctx,
1011
          connection().remote().lastStreamCreated(),
1✔
1012
          Http2Error.NO_ERROR.code(),
1✔
1013
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1014
          ctx.newPromise());
1✔
1015

1016
      // gracefully shutdown with specified grace time
1017
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1018
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1019
      try {
1020
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1021
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1022
      } catch (Exception e) {
×
1023
        onError(ctx, /* outbound= */ true, e);
×
1024
      } finally {
1025
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1026
      }
1027
    }
1✔
1028

1029
    private long graceTimeOverrideMillis(long originalMillis) {
1030
      if (graceTimeInNanos == null) {
1✔
1031
        return originalMillis;
1✔
1032
      }
1033
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1034
        // netty treats -1 as "no timeout"
1035
        return -1L;
1✔
1036
      }
1037
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1038
    }
1039
  }
1040

1041
  // Use a frame writer so that we know when frames are through flow control and actually being
1042
  // written.
1043
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1044
    private final KeepAliveEnforcer keepAliveEnforcer;
1045

1046
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1047
        KeepAliveEnforcer keepAliveEnforcer) {
1048
      super(delegate);
1✔
1049
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1050
    }
1✔
1051

1052
    @Override
1053
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1054
        int padding, boolean endStream, ChannelPromise promise) {
1055
      keepAliveEnforcer.resetCounters();
1✔
1056
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1057
    }
1058

1059
    @Override
1060
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1061
        int padding, boolean endStream, ChannelPromise promise) {
1062
      keepAliveEnforcer.resetCounters();
1✔
1063
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1064
    }
1065

1066
    @Override
1067
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1068
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1069
        ChannelPromise promise) {
1070
      keepAliveEnforcer.resetCounters();
×
1071
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1072
          padding, endStream, promise);
1073
    }
1074
  }
1075

1076
  private static class ServerChannelLogger extends ChannelLogger {
1077
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1078

1079
    @Override
1080
    public void log(ChannelLogLevel level, String message) {
1081
      log.log(toJavaLogLevel(level), message);
1✔
1082
    }
1✔
1083

1084
    @Override
1085
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1086
      log(level, MessageFormat.format(messageFormat, args));
1✔
1087
    }
1✔
1088
  }
1089

1090
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1091
    switch (level) {
1✔
1092
      case ERROR:
1093
        return Level.FINE;
×
1094
      case WARNING:
1095
        return Level.FINER;
×
1096
      default:
1097
        return Level.FINEST;
1✔
1098
    }
1099
  }
1100
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc