• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19314

26 Jun 2024 09:30PM CUT coverage: 88.23%. Remained the same
#19314

push

github

web-flow
Revert "Start 1.63.3 development cycle (#11318)" (#11333)

This reverts commit 7ece6555b.

31206 of 35369 relevant lines covered (88.23%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.87
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
64
import io.netty.handler.codec.http2.DefaultHttp2Connection;
65
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
66
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
67
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
68
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
69
import io.netty.handler.codec.http2.DefaultHttp2Headers;
70
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
71
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
72
import io.netty.handler.codec.http2.Http2Connection;
73
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
74
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
75
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
76
import io.netty.handler.codec.http2.Http2Error;
77
import io.netty.handler.codec.http2.Http2Exception;
78
import io.netty.handler.codec.http2.Http2Exception.StreamException;
79
import io.netty.handler.codec.http2.Http2FrameAdapter;
80
import io.netty.handler.codec.http2.Http2FrameLogger;
81
import io.netty.handler.codec.http2.Http2FrameReader;
82
import io.netty.handler.codec.http2.Http2FrameWriter;
83
import io.netty.handler.codec.http2.Http2Headers;
84
import io.netty.handler.codec.http2.Http2HeadersDecoder;
85
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
86
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
87
import io.netty.handler.codec.http2.Http2Settings;
88
import io.netty.handler.codec.http2.Http2Stream;
89
import io.netty.handler.codec.http2.Http2StreamVisitor;
90
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
91
import io.netty.handler.logging.LogLevel;
92
import io.netty.util.AsciiString;
93
import io.netty.util.ReferenceCountUtil;
94
import io.perfmark.PerfMark;
95
import io.perfmark.Tag;
96
import io.perfmark.TaskCloseable;
97
import java.text.MessageFormat;
98
import java.util.List;
99
import java.util.concurrent.Future;
100
import java.util.concurrent.ScheduledFuture;
101
import java.util.concurrent.TimeUnit;
102
import java.util.logging.Level;
103
import java.util.logging.Logger;
104
import javax.annotation.CheckForNull;
105
import javax.annotation.Nullable;
106

107
/**
108
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
109
 * the context of the Netty Channel thread.
110
 */
111
class NettyServerHandler extends AbstractNettyHandler {
112
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
113
  private static final long KEEPALIVE_PING = 0xDEADL;
114
  @VisibleForTesting
115
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
116
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
117
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
118
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
119
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
120

121
  private final Http2Connection.PropertyKey streamKey;
122
  private final ServerTransportListener transportListener;
123
  private final int maxMessageSize;
124
  private final long keepAliveTimeInNanos;
125
  private final long keepAliveTimeoutInNanos;
126
  private final long maxConnectionAgeInNanos;
127
  private final long maxConnectionAgeGraceInNanos;
128
  private final int maxRstCount;
129
  private final long maxRstPeriodNanos;
130
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
131
  private final TransportTracer transportTracer;
132
  private final KeepAliveEnforcer keepAliveEnforcer;
133
  private final Attributes eagAttributes;
134
  private final Ticker ticker;
135
  /** Incomplete attributes produced by negotiator. */
136
  private Attributes negotiationAttributes;
137
  private InternalChannelz.Security securityInfo;
138
  /** Completed attributes produced by transportReady. */
139
  private Attributes attributes;
140
  private Throwable connectionError;
141
  private boolean teWarningLogged;
142
  private WriteQueue serverWriteQueue;
143
  private AsciiString lastKnownAuthority;
144
  @CheckForNull
145
  private KeepAliveManager keepAliveManager;
146
  @CheckForNull
147
  private MaxConnectionIdleManager maxConnectionIdleManager;
148
  @CheckForNull
149
  private ScheduledFuture<?> maxConnectionAgeMonitor;
150
  @CheckForNull
151
  private GracefulShutdown gracefulShutdown;
152
  private int rstCount;
153
  private long lastRstNanoTime;
154

155

156
  static NettyServerHandler newHandler(
157
      ServerTransportListener transportListener,
158
      ChannelPromise channelUnused,
159
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
160
      TransportTracer transportTracer,
161
      int maxStreams,
162
      boolean autoFlowControl,
163
      int flowControlWindow,
164
      int maxHeaderListSize,
165
      int maxMessageSize,
166
      long keepAliveTimeInNanos,
167
      long keepAliveTimeoutInNanos,
168
      long maxConnectionIdleInNanos,
169
      long maxConnectionAgeInNanos,
170
      long maxConnectionAgeGraceInNanos,
171
      boolean permitKeepAliveWithoutCalls,
172
      long permitKeepAliveTimeInNanos,
173
      int maxRstCount,
174
      long maxRstPeriodNanos,
175
      Attributes eagAttributes) {
176
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
177
        maxHeaderListSize);
178
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
179
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
180
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
181
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
182
    Http2FrameWriter frameWriter =
1✔
183
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
184
    return newHandler(
1✔
185
        channelUnused,
186
        frameReader,
187
        frameWriter,
188
        transportListener,
189
        streamTracerFactories,
190
        transportTracer,
191
        maxStreams,
192
        autoFlowControl,
193
        flowControlWindow,
194
        maxHeaderListSize,
195
        maxMessageSize,
196
        keepAliveTimeInNanos,
197
        keepAliveTimeoutInNanos,
198
        maxConnectionIdleInNanos,
199
        maxConnectionAgeInNanos,
200
        maxConnectionAgeGraceInNanos,
201
        permitKeepAliveWithoutCalls,
202
        permitKeepAliveTimeInNanos,
203
        maxRstCount,
204
        maxRstPeriodNanos,
205
        eagAttributes,
206
        Ticker.systemTicker());
1✔
207
  }
208

209
  static NettyServerHandler newHandler(
210
      ChannelPromise channelUnused,
211
      Http2FrameReader frameReader,
212
      Http2FrameWriter frameWriter,
213
      ServerTransportListener transportListener,
214
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
215
      TransportTracer transportTracer,
216
      int maxStreams,
217
      boolean autoFlowControl,
218
      int flowControlWindow,
219
      int maxHeaderListSize,
220
      int maxMessageSize,
221
      long keepAliveTimeInNanos,
222
      long keepAliveTimeoutInNanos,
223
      long maxConnectionIdleInNanos,
224
      long maxConnectionAgeInNanos,
225
      long maxConnectionAgeGraceInNanos,
226
      boolean permitKeepAliveWithoutCalls,
227
      long permitKeepAliveTimeInNanos,
228
      int maxRstCount,
229
      long maxRstPeriodNanos,
230
      Attributes eagAttributes,
231
      Ticker ticker) {
232
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
233
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
234
        flowControlWindow);
235
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
236
        maxHeaderListSize);
237
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
238
        maxMessageSize);
239

240
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
241
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
242
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
243
    DefaultHttp2RemoteFlowController controller =
1✔
244
        new DefaultHttp2RemoteFlowController(connection, dist);
245
    connection.remote().flowController(controller);
1✔
246
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
247
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
248

249
    // Create the local flow controller configured to auto-refill the connection window.
250
    connection.local().flowController(
1✔
251
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
252
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
253
    Http2ConnectionEncoder encoder =
1✔
254
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
255
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
256
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
257
        frameReader);
258

259
    Http2Settings settings = new Http2Settings();
1✔
260
    settings.initialWindowSize(flowControlWindow);
1✔
261
    settings.maxConcurrentStreams(maxStreams);
1✔
262
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
263

264
    if (ticker == null) {
1✔
265
      ticker = Ticker.systemTicker();
×
266
    }
267

268
    return new NettyServerHandler(
1✔
269
        channelUnused,
270
        connection,
271
        transportListener,
272
        streamTracerFactories,
273
        transportTracer,
274
        decoder, encoder, settings,
275
        maxMessageSize,
276
        keepAliveTimeInNanos, keepAliveTimeoutInNanos,
277
        maxConnectionIdleInNanos,
278
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
279
        keepAliveEnforcer,
280
        autoFlowControl,
281
        maxRstCount,
282
        maxRstPeriodNanos,
283
        eagAttributes, ticker);
284
  }
285

286
  private NettyServerHandler(
287
      ChannelPromise channelUnused,
288
      final Http2Connection connection,
289
      ServerTransportListener transportListener,
290
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
291
      TransportTracer transportTracer,
292
      Http2ConnectionDecoder decoder,
293
      Http2ConnectionEncoder encoder,
294
      Http2Settings settings,
295
      int maxMessageSize,
296
      long keepAliveTimeInNanos,
297
      long keepAliveTimeoutInNanos,
298
      long maxConnectionIdleInNanos,
299
      long maxConnectionAgeInNanos,
300
      long maxConnectionAgeGraceInNanos,
301
      final KeepAliveEnforcer keepAliveEnforcer,
302
      boolean autoFlowControl,
303
      int maxRstCount,
304
      long maxRstPeriodNanos,
305
      Attributes eagAttributes,
306
      Ticker ticker) {
307
    super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
1✔
308
        autoFlowControl, null, ticker);
309

310
    final MaxConnectionIdleManager maxConnectionIdleManager;
311
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
312
      maxConnectionIdleManager = null;
1✔
313
    } else {
314
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
315
    }
316

317
    connection.addListener(new Http2ConnectionAdapter() {
1✔
318
      @Override
319
      public void onStreamActive(Http2Stream stream) {
320
        if (connection.numActiveStreams() == 1) {
1✔
321
          keepAliveEnforcer.onTransportActive();
1✔
322
          if (maxConnectionIdleManager != null) {
1✔
323
            maxConnectionIdleManager.onTransportActive();
1✔
324
          }
325
        }
326
      }
1✔
327

328
      @Override
329
      public void onStreamClosed(Http2Stream stream) {
330
        if (connection.numActiveStreams() == 0) {
1✔
331
          keepAliveEnforcer.onTransportIdle();
1✔
332
          if (maxConnectionIdleManager != null) {
1✔
333
            maxConnectionIdleManager.onTransportIdle();
1✔
334
          }
335
        }
336
      }
1✔
337
    });
338

339
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
340
    this.maxMessageSize = maxMessageSize;
1✔
341
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
342
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
343
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
344
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
345
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
346
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
347
    this.maxRstCount = maxRstCount;
1✔
348
    this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
349
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
350
    this.ticker = checkNotNull(ticker, "ticker");
1✔
351

352
    this.lastRstNanoTime = ticker.read();
1✔
353
    streamKey = encoder.connection().newKey();
1✔
354
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
355
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
356
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
357
    // Set the frame listener on the decoder.
358
    decoder().frameListener(new FrameListener());
1✔
359
  }
1✔
360

361
  @Nullable
362
  Throwable connectionError() {
363
    return connectionError;
1✔
364
  }
365

366
  @Override
367
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
368
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
369

370
    // init max connection age monitor
371
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
372
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
373
          new LogExceptionRunnable(new Runnable() {
1✔
374
            @Override
375
            public void run() {
376
              if (gracefulShutdown == null) {
1✔
377
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
378
                gracefulShutdown.start(ctx);
1✔
379
                ctx.flush();
1✔
380
              }
381
            }
1✔
382
          }),
383
          maxConnectionAgeInNanos,
384
          TimeUnit.NANOSECONDS);
385
    }
386

387
    if (maxConnectionIdleManager != null) {
1✔
388
      maxConnectionIdleManager.start(new Runnable() {
1✔
389
        @Override
390
        public void run() {
391
          if (gracefulShutdown == null) {
1✔
392
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
393
            gracefulShutdown.start(ctx);
1✔
394
            ctx.flush();
1✔
395
          }
396
        }
1✔
397
      }, ctx.executor());
1✔
398
    }
399

400
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
401
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
402
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
403
      keepAliveManager.onTransportStarted();
1✔
404
    }
405

406
    assert encoder().connection().equals(decoder().connection());
1✔
407
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
408

409
    super.handlerAdded(ctx);
1✔
410
  }
1✔
411

412
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
413
      throws Http2Exception {
414
    try {
415
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
416
      // by Netty. RFC 7540 section 8.1.2.2
417
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
418
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
419
        return;
×
420
      }
421

422
      if (headers.authority() == null) {
1✔
423
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
424
        if (hosts.size() > 1) {
1✔
425
          // RFC 7230 section 5.4
426
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
427
              "Multiple host headers");
428
          return;
1✔
429
        }
430
        if (!hosts.isEmpty()) {
1✔
431
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
432
        }
433
      }
434
      headers.remove(HOST);
1✔
435

436
      // Remove the leading slash of the path and get the fully qualified method name
437
      CharSequence path = headers.path();
1✔
438

439
      if (path == null) {
1✔
440
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
441
            "Expected path but is missing");
442
        return;
1✔
443
      }
444

445
      if (path.charAt(0) != '/') {
1✔
446
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
447
            String.format("Expected path to start with /: %s", path));
1✔
448
        return;
1✔
449
      }
450

451
      String method = path.subSequence(1, path.length()).toString();
1✔
452

453
      // Verify that the Content-Type is correct in the request.
454
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
455
      if (contentType == null) {
1✔
456
        respondWithHttpError(
1✔
457
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
458
        return;
1✔
459
      }
460
      String contentTypeString = contentType.toString();
1✔
461
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
462
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
463
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
464
        return;
1✔
465
      }
466

467
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
468
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
469
            String.format("Method '%s' is not supported", headers.method()));
1✔
470
        return;
1✔
471
      }
472

473
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
474
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
475
                + "some intermediate proxy may not support trailers",
476
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
477
        teWarningLogged = true;
1✔
478
      }
479

480
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
481
      // method.
482
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
483

484
      Metadata metadata = Utils.convertHeaders(headers);
1✔
485
      StatsTraceContext statsTraceCtx =
1✔
486
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
487

488
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
489
          this,
490
          ctx.channel().eventLoop(),
1✔
491
          http2Stream,
492
          maxMessageSize,
493
          statsTraceCtx,
494
          transportTracer,
495
          method);
496

497
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
498
        PerfMark.attachTag(state.tag());
1✔
499
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
500
        NettyServerStream stream = new NettyServerStream(
1✔
501
            ctx.channel(),
1✔
502
            state,
503
            attributes,
504
            authority,
505
            statsTraceCtx,
506
            transportTracer);
507
        transportListener.streamCreated(stream, method, metadata);
1✔
508
        state.onStreamAllocated();
1✔
509
        http2Stream.setProperty(streamKey, state);
1✔
510
      }
511
    } catch (Exception e) {
×
512
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
513
      // Throw an exception that will get handled by onStreamError.
514
      throw newStreamException(streamId, e);
×
515
    }
1✔
516
  }
1✔
517

518
  private String getOrUpdateAuthority(AsciiString authority) {
519
    if (authority == null) {
1✔
520
      return null;
1✔
521
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
522
      lastKnownAuthority = authority;
1✔
523
    }
524

525
    // AsciiString.toString() is internally cached, so subsequent calls will not
526
    // result in recomputing the String representation of lastKnownAuthority.
527
    return lastKnownAuthority.toString();
1✔
528
  }
529

530
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
531
      throws Http2Exception {
532
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
533
    try {
534
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
535
      if (stream == null) {
1✔
536
        return;
1✔
537
      }
538
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
539
        PerfMark.attachTag(stream.tag());
1✔
540
        stream.inboundDataReceived(data, endOfStream);
1✔
541
      }
542
    } catch (Throwable e) {
×
543
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
544
      // Throw an exception that will get handled by onStreamError.
545
      throw newStreamException(streamId, e);
×
546
    }
1✔
547
  }
1✔
548

549
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
550
    if (maxRstCount > 0) {
1✔
551
      long now = ticker.read();
1✔
552
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
553
        lastRstNanoTime = now;
1✔
554
        rstCount = 1;
1✔
555
      } else {
556
        rstCount++;
1✔
557
        if (rstCount > maxRstCount) {
1✔
558
          throw new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
559
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
560
            @Override
561
            public Throwable fillInStackTrace() {
562
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
563
              return this;
1✔
564
            }
565
          };
566
        }
567
      }
568
    }
569

570
    try {
571
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
572
      if (stream != null) {
1✔
573
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
574
          PerfMark.attachTag(stream.tag());
1✔
575
          stream.transportReportStatus(
1✔
576
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
577
        }
578
      }
579
    } catch (Throwable e) {
1✔
580
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
1✔
581
      // Throw an exception that will get handled by onStreamError.
582
      throw newStreamException(streamId, e);
1✔
583
    }
1✔
584
  }
1✔
585

586
  @Override
587
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
588
      Http2Exception http2Ex) {
589
    logger.log(Level.FINE, "Connection Error", cause);
1✔
590
    connectionError = cause;
1✔
591
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
592
  }
1✔
593

594
  @Override
595
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
596
      StreamException http2Ex) {
597
    NettyServerStream.TransportState serverStream = serverStream(
1✔
598
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
599
    Level level = Level.WARNING;
1✔
600
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
601
      level = Level.FINE;
×
602
    }
603
    logger.log(level, "Stream Error", cause);
1✔
604
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
605
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
606
      PerfMark.attachTag(tag);
1✔
607
      if (serverStream != null) {
1✔
608
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
609
      }
610
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
611
      // Delegate to the base class to send a RST_STREAM.
612
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
613
    }
614
  }
1✔
615

616
  @Override
617
  public void handleProtocolNegotiationCompleted(
618
      Attributes attrs, InternalChannelz.Security securityInfo) {
619
    negotiationAttributes = attrs;
1✔
620
    this.securityInfo = securityInfo;
1✔
621
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
622
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
623
  }
1✔
624

625
  @Override
626
  public Attributes getEagAttributes() {
627
    return eagAttributes;
1✔
628
  }
629

630
  InternalChannelz.Security getSecurityInfo() {
631
    return securityInfo;
1✔
632
  }
633

634
  @VisibleForTesting
635
  KeepAliveManager getKeepAliveManagerForTest() {
636
    return keepAliveManager;
1✔
637
  }
638

639
  @VisibleForTesting
640
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
641
    this.keepAliveManager = keepAliveManager;
1✔
642
  }
1✔
643

644
  /**
645
   * Handler for the Channel shutting down.
646
   */
647
  @Override
648
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
649
    try {
650
      if (keepAliveManager != null) {
1✔
651
        keepAliveManager.onTransportTermination();
1✔
652
      }
653
      if (maxConnectionIdleManager != null) {
1✔
654
        maxConnectionIdleManager.onTransportTermination();
1✔
655
      }
656
      if (maxConnectionAgeMonitor != null) {
1✔
657
        maxConnectionAgeMonitor.cancel(false);
1✔
658
      }
659
      final Status status =
1✔
660
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
661
      // Any streams that are still active must be closed
662
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
663
        @Override
664
        public boolean visit(Http2Stream stream) throws Http2Exception {
665
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
666
          if (serverStream != null) {
1✔
667
            serverStream.transportReportStatus(status);
1✔
668
          }
669
          return true;
1✔
670
        }
671
      });
672
    } finally {
673
      super.channelInactive(ctx);
1✔
674
    }
675
  }
1✔
676

677
  WriteQueue getWriteQueue() {
678
    return serverWriteQueue;
1✔
679
  }
680

681
  /** Handler for commands sent from the stream. */
682
  @Override
683
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
684
      throws Exception {
685
    if (msg instanceof SendGrpcFrameCommand) {
1✔
686
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
687
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
688
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
689
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
690
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
691
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
692
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
693
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
694
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
695
    } else {
696
      AssertionError e =
×
697
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
698
      ReferenceCountUtil.release(msg);
×
699
      promise.setFailure(e);
×
700
      throw e;
×
701
    }
702
  }
1✔
703

704
  @Override
705
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
706
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
707
    ctx.flush();
1✔
708
  }
1✔
709

710
  /**
711
   * Returns the given processed bytes back to inbound flow control.
712
   */
713
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
714
    try {
715
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
716
    } catch (Http2Exception e) {
×
717
      throw new RuntimeException(e);
×
718
    }
1✔
719
  }
1✔
720

721
  private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
722
    promise.addListener(
1✔
723
        new ChannelFutureListener() {
1✔
724
          @Override
725
          public void operationComplete(ChannelFuture future) {
726
            serverStream(stream).complete();
1✔
727
          }
1✔
728
        });
729
  }
1✔
730

731
  private static void streamGone(int streamId, ChannelPromise promise) {
732
    promise.setFailure(
1✔
733
        new IllegalStateException(
734
            "attempting to write to stream " + streamId + " that no longer exists") {
1✔
735
          @Override
736
          public synchronized Throwable fillInStackTrace() {
737
            return this;
1✔
738
          }
739
        });
740
  }
1✔
741

742
  /** Sends the given gRPC frame to the client. */
743
  private void sendGrpcFrame(
744
      ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
745
      throws Http2Exception {
746
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
747
      PerfMark.attachTag(cmd.stream().tag());
1✔
748
      PerfMark.linkIn(cmd.getLink());
1✔
749
      int streamId = cmd.stream().id();
1✔
750
      Http2Stream stream = connection().stream(streamId);
1✔
751
      if (stream == null) {
1✔
752
        cmd.release();
1✔
753
        streamGone(streamId, promise);
1✔
754
        return;
1✔
755
      }
756
      if (cmd.endStream()) {
1✔
757
        closeStreamWhenDone(promise, stream);
×
758
      }
759
      // Call the base class to write the HTTP/2 DATA frame.
760
      encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
1✔
761
    }
762
  }
1✔
763

764
  /**
765
   * Sends the response headers to the client.
766
   */
767
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
768
      ChannelPromise promise) throws Http2Exception {
769
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
770
      PerfMark.attachTag(cmd.stream().tag());
1✔
771
      PerfMark.linkIn(cmd.getLink());
1✔
772
      int streamId = cmd.stream().id();
1✔
773
      Http2Stream stream = connection().stream(streamId);
1✔
774
      if (stream == null) {
1✔
775
        streamGone(streamId, promise);
1✔
776
        return;
1✔
777
      }
778
      if (cmd.endOfStream()) {
1✔
779
        closeStreamWhenDone(promise, stream);
1✔
780
      }
781
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
782
    }
783
  }
1✔
784

785
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
786
      ChannelPromise promise) {
787
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
788
      PerfMark.attachTag(cmd.stream().tag());
1✔
789
      PerfMark.linkIn(cmd.getLink());
1✔
790
      // Notify the listener if we haven't already.
791
      cmd.stream().transportReportStatus(cmd.reason());
1✔
792
      // Terminate the stream.
793
      encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
794
    }
795
  }
1✔
796

797
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
798
      ChannelPromise promise) throws Exception {
799
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
800
    // requested here. But that's an edge case and seems bug-prone.
801
    if (gracefulShutdown == null) {
1✔
802
      Long graceTimeInNanos = null;
1✔
803
      if (msg.getGraceTimeUnit() != null) {
1✔
804
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
805
      }
806
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
807
      gracefulShutdown.start(ctx);
1✔
808
    }
809
    promise.setSuccess();
1✔
810
  }
1✔
811

812
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
813
      ChannelPromise promise) throws Exception {
814
    super.close(ctx, promise);
1✔
815
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
816
      @Override
817
      public boolean visit(Http2Stream stream) throws Http2Exception {
818
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
819
        if (serverStream != null) {
1✔
820
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
821
            PerfMark.attachTag(serverStream.tag());
1✔
822
            PerfMark.linkIn(msg.getLink());
1✔
823
            serverStream.transportReportStatus(msg.getStatus());
1✔
824
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
825
          }
826
        }
827
        stream.close();
1✔
828
        return true;
1✔
829
      }
830
    });
831
  }
1✔
832

833
  private void respondWithHttpError(
834
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
835
    Metadata metadata = new Metadata();
1✔
836
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
837
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
838
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
839

840
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
841
        .status("" + code)
1✔
842
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
843
    for (int i = 0; i < serialized.length; i += 2) {
1✔
844
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
845
    }
846
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
847
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
848
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
849
  }
1✔
850

851
  private Http2Stream requireHttp2Stream(int streamId) {
852
    Http2Stream stream = connection().stream(streamId);
1✔
853
    if (stream == null) {
1✔
854
      // This should never happen.
855
      throw new AssertionError("Stream does not exist: " + streamId);
×
856
    }
857
    return stream;
1✔
858
  }
859

860
  /**
861
   * Returns the server stream associated to the given HTTP/2 stream object.
862
   */
863
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
864
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
865
  }
866

867
  private Http2Exception newStreamException(int streamId, Throwable cause) {
868
    return Http2Exception.streamError(
1✔
869
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
1✔
870
  }
871

872
  private class FrameListener extends Http2FrameAdapter {
1✔
873
    private boolean firstSettings = true;
1✔
874

875
    @Override
876
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
877
      if (firstSettings) {
1✔
878
        firstSettings = false;
1✔
879
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
880
        // handshakeTimeout
881
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
882
      }
883
    }
1✔
884

885
    @Override
886
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
887
        boolean endOfStream) throws Http2Exception {
888
      if (keepAliveManager != null) {
1✔
889
        keepAliveManager.onDataReceived();
1✔
890
      }
891
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
892
      return padding;
1✔
893
    }
894

895
    @Override
896
    public void onHeadersRead(ChannelHandlerContext ctx,
897
        int streamId,
898
        Http2Headers headers,
899
        int streamDependency,
900
        short weight,
901
        boolean exclusive,
902
        int padding,
903
        boolean endStream) throws Http2Exception {
904
      if (keepAliveManager != null) {
1✔
905
        keepAliveManager.onDataReceived();
1✔
906
      }
907
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
908
      if (endStream) {
1✔
909
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
910
      }
911
    }
1✔
912

913
    @Override
914
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
915
        throws Http2Exception {
916
      if (keepAliveManager != null) {
1✔
917
        keepAliveManager.onDataReceived();
1✔
918
      }
919
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
920
    }
1✔
921

922
    @Override
923
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
924
      if (keepAliveManager != null) {
1✔
925
        keepAliveManager.onDataReceived();
1✔
926
      }
927
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
928
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
929
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
930
            debugData, ctx.newPromise());
1✔
931
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
932
        try {
933
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
934
        } catch (Exception ex) {
×
935
          onError(ctx, /* outbound= */ true, ex);
×
936
        }
1✔
937
      }
938
    }
1✔
939

940
    @Override
941
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
942
      if (keepAliveManager != null) {
1✔
943
        keepAliveManager.onDataReceived();
1✔
944
      }
945
      if (data == flowControlPing().payload()) {
1✔
946
        flowControlPing().updateWindow();
1✔
947
        logger.log(Level.FINE, "Window: {0}",
1✔
948
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
949
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
950
        if (gracefulShutdown == null) {
1✔
951
          // this should never happen
952
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
953
        } else {
954
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
955
        }
956
      } else if (data != KEEPALIVE_PING) {
1✔
957
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
958
      }
959
    }
1✔
960
  }
961

962
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
963
    final ChannelHandlerContext ctx;
964

965
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
966
      this.ctx = ctx;
1✔
967
    }
1✔
968

969
    @Override
970
    public void ping() {
971
      ChannelFuture pingFuture = encoder().writePing(
1✔
972
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
973
      ctx.flush();
1✔
974
      pingFuture.addListener(new ChannelFutureListener() {
1✔
975
        @Override
976
        public void operationComplete(ChannelFuture future) throws Exception {
977
          if (future.isSuccess()) {
1✔
978
            transportTracer.reportKeepAliveSent();
1✔
979
          }
980
        }
1✔
981
      });
982
    }
1✔
983

984
    @Override
985
    public void onPingTimeout() {
986
      try {
987
        forcefulClose(
1✔
988
            ctx,
989
            new ForcefulCloseCommand(Status.UNAVAILABLE
990
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
991
            ctx.newPromise());
1✔
992
      } catch (Exception ex) {
×
993
        try {
994
          exceptionCaught(ctx, ex);
×
995
        } catch (Exception ex2) {
×
996
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
997
          logger.log(Level.WARNING, "Original failure", ex);
×
998
        }
×
999
      }
1✔
1000
    }
1✔
1001
  }
1002

1003
  private final class GracefulShutdown {
1004
    String goAwayMessage;
1005

1006
    /**
1007
     * The grace time between starting graceful shutdown and closing the netty channel,
1008
     * {@code null} is unspecified.
1009
     */
1010
    @CheckForNull
1011
    Long graceTimeInNanos;
1012

1013
    /**
1014
     * True if ping is Acked or ping is timeout.
1015
     */
1016
    boolean pingAckedOrTimeout;
1017

1018
    Future<?> pingFuture;
1019

1020
    GracefulShutdown(String goAwayMessage,
1021
        @Nullable Long graceTimeInNanos) {
1✔
1022
      this.goAwayMessage = goAwayMessage;
1✔
1023
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1024
    }
1✔
1025

1026
    /**
1027
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1028
     */
1029
    void start(final ChannelHandlerContext ctx) {
1030
      goAway(
1✔
1031
          ctx,
1032
          Integer.MAX_VALUE,
1033
          Http2Error.NO_ERROR.code(),
1✔
1034
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1035
          ctx.newPromise());
1✔
1036

1037
      pingFuture = ctx.executor().schedule(
1✔
1038
          new Runnable() {
1✔
1039
            @Override
1040
            public void run() {
1041
              secondGoAwayAndClose(ctx);
1✔
1042
            }
1✔
1043
          },
1044
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1045
          TimeUnit.NANOSECONDS);
1046

1047
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1048
    }
1✔
1049

1050
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1051
      if (pingAckedOrTimeout) {
1✔
1052
        return;
×
1053
      }
1054
      pingAckedOrTimeout = true;
1✔
1055

1056
      checkNotNull(pingFuture, "pingFuture");
1✔
1057
      pingFuture.cancel(false);
1✔
1058

1059
      // send the second GOAWAY with last stream id
1060
      goAway(
1✔
1061
          ctx,
1062
          connection().remote().lastStreamCreated(),
1✔
1063
          Http2Error.NO_ERROR.code(),
1✔
1064
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1065
          ctx.newPromise());
1✔
1066

1067
      // gracefully shutdown with specified grace time
1068
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1069
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1070
      try {
1071
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1072
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1073
      } catch (Exception e) {
×
1074
        onError(ctx, /* outbound= */ true, e);
×
1075
      } finally {
1076
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1077
      }
1078
    }
1✔
1079

1080
    private long graceTimeOverrideMillis(long originalMillis) {
1081
      if (graceTimeInNanos == null) {
1✔
1082
        return originalMillis;
1✔
1083
      }
1084
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1085
        // netty treats -1 as "no timeout"
1086
        return -1L;
1✔
1087
      }
1088
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1089
    }
1090
  }
1091

1092
  // Use a frame writer so that we know when frames are through flow control and actually being
1093
  // written.
1094
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1095
    private final KeepAliveEnforcer keepAliveEnforcer;
1096

1097
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1098
        KeepAliveEnforcer keepAliveEnforcer) {
1099
      super(delegate);
1✔
1100
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1101
    }
1✔
1102

1103
    @Override
1104
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1105
        int padding, boolean endStream, ChannelPromise promise) {
1106
      keepAliveEnforcer.resetCounters();
1✔
1107
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1108
    }
1109

1110
    @Override
1111
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1112
        int padding, boolean endStream, ChannelPromise promise) {
1113
      keepAliveEnforcer.resetCounters();
1✔
1114
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1115
    }
1116

1117
    @Override
1118
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1119
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1120
        ChannelPromise promise) {
1121
      keepAliveEnforcer.resetCounters();
×
1122
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1123
          padding, endStream, promise);
1124
    }
1125
  }
1126

1127
  private static class ServerChannelLogger extends ChannelLogger {
1128
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1129

1130
    @Override
1131
    public void log(ChannelLogLevel level, String message) {
1132
      log.log(toJavaLogLevel(level), message);
1✔
1133
    }
1✔
1134

1135
    @Override
1136
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1137
      log(level, MessageFormat.format(messageFormat, args));
1✔
1138
    }
1✔
1139
  }
1140

1141
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1142
    switch (level) {
1✔
1143
      case ERROR:
1144
        return Level.FINE;
×
1145
      case WARNING:
1146
        return Level.FINER;
×
1147
      default:
1148
        return Level.FINEST;
1✔
1149
    }
1150
  }
1151
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc