• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19298

21 Jun 2024 11:31PM CUT coverage: 88.456% (-0.002%) from 88.458%
#19298

push

github

web-flow
netty:Fix Netty composite buffer merging to be compatible with Netty 4.1.111 (#11294) (#11303)

* Use addComponent instead of addFlattenedComponent and do not append to components that are composites.

32060 of 36244 relevant lines covered (88.46%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.09
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
64
import io.netty.handler.codec.http2.DefaultHttp2Connection;
65
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
66
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
67
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
68
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
69
import io.netty.handler.codec.http2.DefaultHttp2Headers;
70
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
71
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
72
import io.netty.handler.codec.http2.Http2Connection;
73
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
74
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
75
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
76
import io.netty.handler.codec.http2.Http2Error;
77
import io.netty.handler.codec.http2.Http2Exception;
78
import io.netty.handler.codec.http2.Http2Exception.StreamException;
79
import io.netty.handler.codec.http2.Http2FrameAdapter;
80
import io.netty.handler.codec.http2.Http2FrameLogger;
81
import io.netty.handler.codec.http2.Http2FrameReader;
82
import io.netty.handler.codec.http2.Http2FrameWriter;
83
import io.netty.handler.codec.http2.Http2Headers;
84
import io.netty.handler.codec.http2.Http2HeadersDecoder;
85
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
86
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
87
import io.netty.handler.codec.http2.Http2Settings;
88
import io.netty.handler.codec.http2.Http2Stream;
89
import io.netty.handler.codec.http2.Http2StreamVisitor;
90
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
91
import io.netty.handler.logging.LogLevel;
92
import io.netty.util.AsciiString;
93
import io.netty.util.ReferenceCountUtil;
94
import io.perfmark.PerfMark;
95
import io.perfmark.Tag;
96
import io.perfmark.TaskCloseable;
97
import java.text.MessageFormat;
98
import java.util.List;
99
import java.util.concurrent.Future;
100
import java.util.concurrent.ScheduledFuture;
101
import java.util.concurrent.TimeUnit;
102
import java.util.logging.Level;
103
import java.util.logging.Logger;
104
import javax.annotation.CheckForNull;
105
import javax.annotation.Nullable;
106

107
/**
108
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
109
 * the context of the Netty Channel thread.
110
 */
111
class NettyServerHandler extends AbstractNettyHandler {
112
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
113
  private static final long KEEPALIVE_PING = 0xDEADL;
114
  @VisibleForTesting
115
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
116
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
117
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
118
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
119
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
120

121
  private final Http2Connection.PropertyKey streamKey;
122
  private final ServerTransportListener transportListener;
123
  private final int maxMessageSize;
124
  private final long keepAliveTimeInNanos;
125
  private final long keepAliveTimeoutInNanos;
126
  private final long maxConnectionAgeInNanos;
127
  private final long maxConnectionAgeGraceInNanos;
128
  private final int maxRstCount;
129
  private final long maxRstPeriodNanos;
130
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
131
  private final TransportTracer transportTracer;
132
  private final KeepAliveEnforcer keepAliveEnforcer;
133
  private final Attributes eagAttributes;
134
  private final Ticker ticker;
135
  /** Incomplete attributes produced by negotiator. */
136
  private Attributes negotiationAttributes;
137
  private InternalChannelz.Security securityInfo;
138
  /** Completed attributes produced by transportReady. */
139
  private Attributes attributes;
140
  private Throwable connectionError;
141
  private boolean teWarningLogged;
142
  private WriteQueue serverWriteQueue;
143
  private AsciiString lastKnownAuthority;
144
  @CheckForNull
145
  private KeepAliveManager keepAliveManager;
146
  @CheckForNull
147
  private MaxConnectionIdleManager maxConnectionIdleManager;
148
  @CheckForNull
149
  private ScheduledFuture<?> maxConnectionAgeMonitor;
150
  @CheckForNull
151
  private GracefulShutdown gracefulShutdown;
152
  private int rstCount;
153
  private long lastRstNanoTime;
154

155

156
  static NettyServerHandler newHandler(
157
      ServerTransportListener transportListener,
158
      ChannelPromise channelUnused,
159
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
160
      TransportTracer transportTracer,
161
      int maxStreams,
162
      boolean autoFlowControl,
163
      int flowControlWindow,
164
      int maxHeaderListSize,
165
      int maxMessageSize,
166
      long keepAliveTimeInNanos,
167
      long keepAliveTimeoutInNanos,
168
      long maxConnectionIdleInNanos,
169
      long maxConnectionAgeInNanos,
170
      long maxConnectionAgeGraceInNanos,
171
      boolean permitKeepAliveWithoutCalls,
172
      long permitKeepAliveTimeInNanos,
173
      int maxRstCount,
174
      long maxRstPeriodNanos,
175
      Attributes eagAttributes) {
176
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
177
        maxHeaderListSize);
178
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
179
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
180
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
181
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
182
    Http2FrameWriter frameWriter =
1✔
183
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
184
    return newHandler(
1✔
185
        channelUnused,
186
        frameReader,
187
        frameWriter,
188
        transportListener,
189
        streamTracerFactories,
190
        transportTracer,
191
        maxStreams,
192
        autoFlowControl,
193
        flowControlWindow,
194
        maxHeaderListSize,
195
        maxMessageSize,
196
        keepAliveTimeInNanos,
197
        keepAliveTimeoutInNanos,
198
        maxConnectionIdleInNanos,
199
        maxConnectionAgeInNanos,
200
        maxConnectionAgeGraceInNanos,
201
        permitKeepAliveWithoutCalls,
202
        permitKeepAliveTimeInNanos,
203
        maxRstCount,
204
        maxRstPeriodNanos,
205
        eagAttributes,
206
        Ticker.systemTicker());
1✔
207
  }
208

209
  static NettyServerHandler newHandler(
210
      ChannelPromise channelUnused,
211
      Http2FrameReader frameReader,
212
      Http2FrameWriter frameWriter,
213
      ServerTransportListener transportListener,
214
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
215
      TransportTracer transportTracer,
216
      int maxStreams,
217
      boolean autoFlowControl,
218
      int flowControlWindow,
219
      int maxHeaderListSize,
220
      int maxMessageSize,
221
      long keepAliveTimeInNanos,
222
      long keepAliveTimeoutInNanos,
223
      long maxConnectionIdleInNanos,
224
      long maxConnectionAgeInNanos,
225
      long maxConnectionAgeGraceInNanos,
226
      boolean permitKeepAliveWithoutCalls,
227
      long permitKeepAliveTimeInNanos,
228
      int maxRstCount,
229
      long maxRstPeriodNanos,
230
      Attributes eagAttributes,
231
      Ticker ticker) {
232
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
233
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
234
        flowControlWindow);
235
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
236
        maxHeaderListSize);
237
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
238
        maxMessageSize);
239

240
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
241
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
242
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
243
    DefaultHttp2RemoteFlowController controller =
1✔
244
        new DefaultHttp2RemoteFlowController(connection, dist);
245
    connection.remote().flowController(controller);
1✔
246
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
247
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
248

249
    // Create the local flow controller configured to auto-refill the connection window.
250
    connection.local().flowController(
1✔
251
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
252
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
253
    Http2ConnectionEncoder encoder =
1✔
254
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
255
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
256
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
257
        frameReader);
258

259
    Http2Settings settings = new Http2Settings();
1✔
260
    settings.initialWindowSize(flowControlWindow);
1✔
261
    settings.maxConcurrentStreams(maxStreams);
1✔
262
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
263

264
    if (ticker == null) {
1✔
265
      ticker = Ticker.systemTicker();
×
266
    }
267

268
    return new NettyServerHandler(
1✔
269
        channelUnused,
270
        connection,
271
        transportListener,
272
        streamTracerFactories,
273
        transportTracer,
274
        decoder, encoder, settings,
275
        maxMessageSize,
276
        keepAliveTimeInNanos, keepAliveTimeoutInNanos,
277
        maxConnectionIdleInNanos,
278
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
279
        keepAliveEnforcer,
280
        autoFlowControl,
281
        maxRstCount,
282
        maxRstPeriodNanos,
283
        eagAttributes, ticker);
284
  }
285

286
  private NettyServerHandler(
287
      ChannelPromise channelUnused,
288
      final Http2Connection connection,
289
      ServerTransportListener transportListener,
290
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
291
      TransportTracer transportTracer,
292
      Http2ConnectionDecoder decoder,
293
      Http2ConnectionEncoder encoder,
294
      Http2Settings settings,
295
      int maxMessageSize,
296
      long keepAliveTimeInNanos,
297
      long keepAliveTimeoutInNanos,
298
      long maxConnectionIdleInNanos,
299
      long maxConnectionAgeInNanos,
300
      long maxConnectionAgeGraceInNanos,
301
      final KeepAliveEnforcer keepAliveEnforcer,
302
      boolean autoFlowControl,
303
      int maxRstCount,
304
      long maxRstPeriodNanos,
305
      Attributes eagAttributes,
306
      Ticker ticker) {
307
    super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
1✔
308
        autoFlowControl, null, ticker);
309

310
    final MaxConnectionIdleManager maxConnectionIdleManager;
311
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
312
      maxConnectionIdleManager = null;
1✔
313
    } else {
314
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
315
    }
316

317
    connection.addListener(new Http2ConnectionAdapter() {
1✔
318
      @Override
319
      public void onStreamActive(Http2Stream stream) {
320
        if (connection.numActiveStreams() == 1) {
1✔
321
          keepAliveEnforcer.onTransportActive();
1✔
322
          if (maxConnectionIdleManager != null) {
1✔
323
            maxConnectionIdleManager.onTransportActive();
1✔
324
          }
325
        }
326
      }
1✔
327

328
      @Override
329
      public void onStreamClosed(Http2Stream stream) {
330
        if (connection.numActiveStreams() == 0) {
1✔
331
          keepAliveEnforcer.onTransportIdle();
1✔
332
          if (maxConnectionIdleManager != null) {
1✔
333
            maxConnectionIdleManager.onTransportIdle();
1✔
334
          }
335
        }
336
      }
1✔
337
    });
338

339
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
340
    this.maxMessageSize = maxMessageSize;
1✔
341
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
342
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
343
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
344
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
345
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
346
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
347
    this.maxRstCount = maxRstCount;
1✔
348
    this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
349
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
350
    this.ticker = checkNotNull(ticker, "ticker");
1✔
351

352
    this.lastRstNanoTime = ticker.read();
1✔
353
    streamKey = encoder.connection().newKey();
1✔
354
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
355
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
356
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
357
    // Set the frame listener on the decoder.
358
    decoder().frameListener(new FrameListener());
1✔
359
  }
1✔
360

361
  @Nullable
362
  Throwable connectionError() {
363
    return connectionError;
1✔
364
  }
365

366
  @Override
367
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
368
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
369

370
    // init max connection age monitor
371
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
372
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
373
          new LogExceptionRunnable(new Runnable() {
1✔
374
            @Override
375
            public void run() {
376
              if (gracefulShutdown == null) {
1✔
377
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
378
                gracefulShutdown.start(ctx);
1✔
379
                ctx.flush();
1✔
380
              }
381
            }
1✔
382
          }),
383
          maxConnectionAgeInNanos,
384
          TimeUnit.NANOSECONDS);
385
    }
386

387
    if (maxConnectionIdleManager != null) {
1✔
388
      maxConnectionIdleManager.start(new Runnable() {
1✔
389
        @Override
390
        public void run() {
391
          if (gracefulShutdown == null) {
1✔
392
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
393
            gracefulShutdown.start(ctx);
1✔
394
            ctx.flush();
1✔
395
          }
396
        }
1✔
397
      }, ctx.executor());
1✔
398
    }
399

400
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
401
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
402
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
403
      keepAliveManager.onTransportStarted();
1✔
404
    }
405

406
    assert encoder().connection().equals(decoder().connection());
1✔
407
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
408

409
    super.handlerAdded(ctx);
1✔
410
  }
1✔
411

412
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
413
      throws Http2Exception {
414
    try {
415
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
416
      // by Netty. RFC 7540 section 8.1.2.2
417
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
418
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
419
        return;
×
420
      }
421

422
      if (headers.authority() == null) {
1✔
423
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
424
        if (hosts.size() > 1) {
1✔
425
          // RFC 7230 section 5.4
426
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
427
              "Multiple host headers");
428
          return;
1✔
429
        }
430
        if (!hosts.isEmpty()) {
1✔
431
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
432
        }
433
      }
434
      headers.remove(HOST);
1✔
435

436
      // Remove the leading slash of the path and get the fully qualified method name
437
      CharSequence path = headers.path();
1✔
438

439
      if (path == null) {
1✔
440
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
441
            "Expected path but is missing");
442
        return;
1✔
443
      }
444

445
      if (path.charAt(0) != '/') {
1✔
446
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
447
            String.format("Expected path to start with /: %s", path));
1✔
448
        return;
1✔
449
      }
450

451
      String method = path.subSequence(1, path.length()).toString();
1✔
452

453
      // Verify that the Content-Type is correct in the request.
454
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
455
      if (contentType == null) {
1✔
456
        respondWithHttpError(
1✔
457
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
458
        return;
1✔
459
      }
460
      String contentTypeString = contentType.toString();
1✔
461
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
462
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
463
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
464
        return;
1✔
465
      }
466

467
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
468
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
469
            String.format("Method '%s' is not supported", headers.method()));
1✔
470
        return;
1✔
471
      }
472

473
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
474
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
475
                + "some intermediate proxy may not support trailers",
476
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
477
        teWarningLogged = true;
1✔
478
      }
479

480
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
481
      // method.
482
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
483

484
      Metadata metadata = Utils.convertHeaders(headers);
1✔
485
      StatsTraceContext statsTraceCtx =
1✔
486
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
487

488
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
489
          this,
490
          ctx.channel().eventLoop(),
1✔
491
          http2Stream,
492
          maxMessageSize,
493
          statsTraceCtx,
494
          transportTracer,
495
          method);
496

497
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
498
        PerfMark.attachTag(state.tag());
1✔
499
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
500
        NettyServerStream stream = new NettyServerStream(
1✔
501
            ctx.channel(),
1✔
502
            state,
503
            attributes,
504
            authority,
505
            statsTraceCtx);
506
        transportListener.streamCreated(stream, method, metadata);
1✔
507
        state.onStreamAllocated();
1✔
508
        http2Stream.setProperty(streamKey, state);
1✔
509
      }
510
    } catch (Exception e) {
×
511
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
512
      // Throw an exception that will get handled by onStreamError.
513
      throw newStreamException(streamId, e);
×
514
    }
1✔
515
  }
1✔
516

517
  private String getOrUpdateAuthority(AsciiString authority) {
518
    if (authority == null) {
1✔
519
      return null;
1✔
520
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
521
      lastKnownAuthority = authority;
1✔
522
    }
523

524
    // AsciiString.toString() is internally cached, so subsequent calls will not
525
    // result in recomputing the String representation of lastKnownAuthority.
526
    return lastKnownAuthority.toString();
1✔
527
  }
528

529
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
530
      throws Http2Exception {
531
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
532
    try {
533
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
534
      if (stream == null) {
1✔
535
        return;
1✔
536
      }
537
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
538
        PerfMark.attachTag(stream.tag());
1✔
539
        stream.inboundDataReceived(data, endOfStream);
1✔
540
      }
541
    } catch (Throwable e) {
×
542
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
543
      // Throw an exception that will get handled by onStreamError.
544
      throw newStreamException(streamId, e);
×
545
    }
1✔
546
  }
1✔
547

548
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
549
    if (maxRstCount > 0) {
1✔
550
      long now = ticker.read();
1✔
551
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
552
        lastRstNanoTime = now;
1✔
553
        rstCount = 1;
1✔
554
      } else {
555
        rstCount++;
1✔
556
        if (rstCount > maxRstCount) {
1✔
557
          throw new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
558
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
559
            @Override
560
            public Throwable fillInStackTrace() {
561
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
562
              return this;
1✔
563
            }
564
          };
565
        }
566
      }
567
    }
568

569
    try {
570
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
571
      if (stream != null) {
1✔
572
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
573
          PerfMark.attachTag(stream.tag());
1✔
574
          stream.transportReportStatus(
1✔
575
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
576
        }
577
      }
578
    } catch (Throwable e) {
1✔
579
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
1✔
580
      // Throw an exception that will get handled by onStreamError.
581
      throw newStreamException(streamId, e);
1✔
582
    }
1✔
583
  }
1✔
584

585
  @Override
586
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
587
      Http2Exception http2Ex) {
588
    logger.log(Level.FINE, "Connection Error", cause);
1✔
589
    connectionError = cause;
1✔
590
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
591
  }
1✔
592

593
  @Override
594
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
595
      StreamException http2Ex) {
596
    NettyServerStream.TransportState serverStream = serverStream(
1✔
597
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
598
    Level level = Level.WARNING;
1✔
599
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
600
      level = Level.FINE;
×
601
    }
602
    logger.log(level, "Stream Error", cause);
1✔
603
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
604
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
605
      PerfMark.attachTag(tag);
1✔
606
      if (serverStream != null) {
1✔
607
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
608
      }
609
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
610
      // Delegate to the base class to send a RST_STREAM.
611
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
612
    }
613
  }
1✔
614

615
  @Override
616
  public void handleProtocolNegotiationCompleted(
617
      Attributes attrs, InternalChannelz.Security securityInfo) {
618
    negotiationAttributes = attrs;
1✔
619
    this.securityInfo = securityInfo;
1✔
620
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
621
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
622
  }
1✔
623

624
  @Override
625
  public Attributes getEagAttributes() {
626
    return eagAttributes;
1✔
627
  }
628

629
  InternalChannelz.Security getSecurityInfo() {
630
    return securityInfo;
1✔
631
  }
632

633
  @VisibleForTesting
634
  KeepAliveManager getKeepAliveManagerForTest() {
635
    return keepAliveManager;
1✔
636
  }
637

638
  @VisibleForTesting
639
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
640
    this.keepAliveManager = keepAliveManager;
1✔
641
  }
1✔
642

643
  /**
644
   * Handler for the Channel shutting down.
645
   */
646
  @Override
647
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
648
    try {
649
      if (keepAliveManager != null) {
1✔
650
        keepAliveManager.onTransportTermination();
1✔
651
      }
652
      if (maxConnectionIdleManager != null) {
1✔
653
        maxConnectionIdleManager.onTransportTermination();
1✔
654
      }
655
      if (maxConnectionAgeMonitor != null) {
1✔
656
        maxConnectionAgeMonitor.cancel(false);
1✔
657
      }
658
      final Status status =
1✔
659
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
660
      // Any streams that are still active must be closed
661
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
662
        @Override
663
        public boolean visit(Http2Stream stream) throws Http2Exception {
664
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
665
          if (serverStream != null) {
1✔
666
            serverStream.transportReportStatus(status);
1✔
667
          }
668
          return true;
1✔
669
        }
670
      });
671
    } finally {
672
      super.channelInactive(ctx);
1✔
673
    }
674
  }
1✔
675

676
  WriteQueue getWriteQueue() {
677
    return serverWriteQueue;
1✔
678
  }
679

680
  /** Handler for commands sent from the stream. */
681
  @Override
682
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
683
      throws Exception {
684
    if (msg instanceof SendGrpcFrameCommand) {
1✔
685
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
686
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
687
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
688
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
689
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
690
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
691
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
692
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
693
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
694
    } else {
695
      AssertionError e =
×
696
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
697
      ReferenceCountUtil.release(msg);
×
698
      promise.setFailure(e);
×
699
      throw e;
×
700
    }
701
  }
1✔
702

703
  @Override
704
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
705
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
706
    ctx.flush();
1✔
707
  }
1✔
708

709
  /**
710
   * Returns the given processed bytes back to inbound flow control.
711
   */
712
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
713
    try {
714
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
715
    } catch (Http2Exception e) {
×
716
      throw new RuntimeException(e);
×
717
    }
1✔
718
  }
1✔
719

720
  private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
721
    promise.addListener(
1✔
722
        new ChannelFutureListener() {
1✔
723
          @Override
724
          public void operationComplete(ChannelFuture future) {
725
            serverStream(stream).complete();
1✔
726
          }
1✔
727
        });
728
  }
1✔
729

730
  private static void streamGone(int streamId, ChannelPromise promise) {
731
    promise.setFailure(
1✔
732
        new IllegalStateException(
733
            "attempting to write to stream " + streamId + " that no longer exists") {
1✔
734
          @Override
735
          public synchronized Throwable fillInStackTrace() {
736
            return this;
1✔
737
          }
738
        });
739
  }
1✔
740

741
  /** Sends the given gRPC frame to the client. */
742
  private void sendGrpcFrame(
743
      ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
744
      throws Http2Exception {
745
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
746
      PerfMark.attachTag(cmd.stream().tag());
1✔
747
      PerfMark.linkIn(cmd.getLink());
1✔
748
      int streamId = cmd.stream().id();
1✔
749
      Http2Stream stream = connection().stream(streamId);
1✔
750
      if (stream == null) {
1✔
751
        cmd.release();
1✔
752
        streamGone(streamId, promise);
1✔
753
        return;
1✔
754
      }
755
      if (cmd.endStream()) {
1✔
756
        closeStreamWhenDone(promise, stream);
×
757
      }
758
      // Call the base class to write the HTTP/2 DATA frame.
759
      encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
1✔
760
    }
761
  }
1✔
762

763
  /**
764
   * Sends the response headers to the client.
765
   */
766
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
767
      ChannelPromise promise) throws Http2Exception {
768
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
769
      PerfMark.attachTag(cmd.stream().tag());
1✔
770
      PerfMark.linkIn(cmd.getLink());
1✔
771
      int streamId = cmd.stream().id();
1✔
772
      Http2Stream stream = connection().stream(streamId);
1✔
773
      if (stream == null) {
1✔
774
        streamGone(streamId, promise);
1✔
775
        return;
1✔
776
      }
777
      if (cmd.endOfStream()) {
1✔
778
        closeStreamWhenDone(promise, stream);
1✔
779
      }
780
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
781
    }
782
  }
1✔
783

784
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
785
      ChannelPromise promise) {
786
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
787
      PerfMark.attachTag(cmd.stream().tag());
1✔
788
      PerfMark.linkIn(cmd.getLink());
1✔
789
      // Notify the listener if we haven't already.
790
      cmd.stream().transportReportStatus(cmd.reason());
1✔
791

792
      // Now we need to decide how we're going to notify the peer that this stream is closed.
793
      // If possible, it's nice to inform the peer _why_ this stream was cancelled by sending
794
      // a structured headers frame.
795
      if (shouldCloseStreamWithHeaders(cmd, connection())) {
1✔
796
        Metadata md = new Metadata();
1✔
797
        md.put(InternalStatus.CODE_KEY, cmd.reason());
1✔
798
        if (cmd.reason().getDescription() != null) {
1✔
799
          md.put(InternalStatus.MESSAGE_KEY, cmd.reason().getDescription());
1✔
800
        }
801
        Http2Headers headers = Utils.convertServerHeaders(md);
1✔
802
        encoder().writeHeaders(
1✔
803
            ctx, cmd.stream().id(), headers, /* padding = */ 0, /* endStream = */ true, promise);
1✔
804
      } else {
1✔
805
        // Terminate the stream.
806
        encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
807
      }
808
    }
809
  }
1✔
810

811
  // Determine whether a CancelServerStreamCommand should try to close the stream with a
812
  // HEADERS or a RST_STREAM frame. The caller has some influence over this (they can
813
  // configure cmd.wantsHeaders()). The state of the stream also has an influence: we
814
  // only try to send HEADERS if the stream exists and hasn't already sent any headers.
815
  private static boolean shouldCloseStreamWithHeaders(
816
          CancelServerStreamCommand cmd, Http2Connection conn) {
817
    if (!cmd.wantsHeaders()) {
1✔
818
      return false;
1✔
819
    }
820
    Http2Stream stream = conn.stream(cmd.stream().id());
1✔
821
    return stream != null && !stream.isHeadersSent();
1✔
822
  }
823

824
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
825
      ChannelPromise promise) throws Exception {
826
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
827
    // requested here. But that's an edge case and seems bug-prone.
828
    if (gracefulShutdown == null) {
1✔
829
      Long graceTimeInNanos = null;
1✔
830
      if (msg.getGraceTimeUnit() != null) {
1✔
831
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
832
      }
833
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
834
      gracefulShutdown.start(ctx);
1✔
835
    }
836
    promise.setSuccess();
1✔
837
  }
1✔
838

839
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
840
      ChannelPromise promise) throws Exception {
841
    super.close(ctx, promise);
1✔
842
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
843
      @Override
844
      public boolean visit(Http2Stream stream) throws Http2Exception {
845
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
846
        if (serverStream != null) {
1✔
847
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
848
            PerfMark.attachTag(serverStream.tag());
1✔
849
            PerfMark.linkIn(msg.getLink());
1✔
850
            serverStream.transportReportStatus(msg.getStatus());
1✔
851
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
852
          }
853
        }
854
        stream.close();
1✔
855
        return true;
1✔
856
      }
857
    });
858
  }
1✔
859

860
  private void respondWithHttpError(
861
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
862
    Metadata metadata = new Metadata();
1✔
863
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
864
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
865
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
866

867
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
868
        .status("" + code)
1✔
869
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
870
    for (int i = 0; i < serialized.length; i += 2) {
1✔
871
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
872
    }
873
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
874
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
875
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
876
  }
1✔
877

878
  private Http2Stream requireHttp2Stream(int streamId) {
879
    Http2Stream stream = connection().stream(streamId);
1✔
880
    if (stream == null) {
1✔
881
      // This should never happen.
882
      throw new AssertionError("Stream does not exist: " + streamId);
×
883
    }
884
    return stream;
1✔
885
  }
886

887
  /**
888
   * Returns the server stream associated to the given HTTP/2 stream object.
889
   */
890
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
891
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
892
  }
893

894
  private Http2Exception newStreamException(int streamId, Throwable cause) {
895
    return Http2Exception.streamError(
1✔
896
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
1✔
897
  }
898

899
  private class FrameListener extends Http2FrameAdapter {
1✔
900
    private boolean firstSettings = true;
1✔
901

902
    @Override
903
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
904
      if (firstSettings) {
1✔
905
        firstSettings = false;
1✔
906
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
907
        // handshakeTimeout
908
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
909
      }
910
    }
1✔
911

912
    @Override
913
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
914
        boolean endOfStream) throws Http2Exception {
915
      if (keepAliveManager != null) {
1✔
916
        keepAliveManager.onDataReceived();
1✔
917
      }
918
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
919
      return padding;
1✔
920
    }
921

922
    @Override
923
    public void onHeadersRead(ChannelHandlerContext ctx,
924
        int streamId,
925
        Http2Headers headers,
926
        int streamDependency,
927
        short weight,
928
        boolean exclusive,
929
        int padding,
930
        boolean endStream) throws Http2Exception {
931
      if (keepAliveManager != null) {
1✔
932
        keepAliveManager.onDataReceived();
1✔
933
      }
934
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
935
      if (endStream) {
1✔
936
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
937
      }
938
    }
1✔
939

940
    @Override
941
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
942
        throws Http2Exception {
943
      if (keepAliveManager != null) {
1✔
944
        keepAliveManager.onDataReceived();
1✔
945
      }
946
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
947
    }
1✔
948

949
    @Override
950
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
951
      if (keepAliveManager != null) {
1✔
952
        keepAliveManager.onDataReceived();
1✔
953
      }
954
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
955
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
956
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
957
            debugData, ctx.newPromise());
1✔
958
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
959
        try {
960
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
961
        } catch (Exception ex) {
×
962
          onError(ctx, /* outbound= */ true, ex);
×
963
        }
1✔
964
      }
965
    }
1✔
966

967
    @Override
968
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
969
      if (keepAliveManager != null) {
1✔
970
        keepAliveManager.onDataReceived();
1✔
971
      }
972
      if (data == flowControlPing().payload()) {
1✔
973
        flowControlPing().updateWindow();
1✔
974
        logger.log(Level.FINE, "Window: {0}",
1✔
975
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
976
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
977
        if (gracefulShutdown == null) {
1✔
978
          // this should never happen
979
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
980
        } else {
981
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
982
        }
983
      } else if (data != KEEPALIVE_PING) {
1✔
984
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
985
      }
986
    }
1✔
987
  }
988

989
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
990
    final ChannelHandlerContext ctx;
991

992
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
993
      this.ctx = ctx;
1✔
994
    }
1✔
995

996
    @Override
997
    public void ping() {
998
      ChannelFuture pingFuture = encoder().writePing(
1✔
999
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
1000
      ctx.flush();
1✔
1001
      pingFuture.addListener(new ChannelFutureListener() {
1✔
1002
        @Override
1003
        public void operationComplete(ChannelFuture future) throws Exception {
1004
          if (future.isSuccess()) {
1✔
1005
            transportTracer.reportKeepAliveSent();
1✔
1006
          }
1007
        }
1✔
1008
      });
1009
    }
1✔
1010

1011
    @Override
1012
    public void onPingTimeout() {
1013
      try {
1014
        forcefulClose(
1✔
1015
            ctx,
1016
            new ForcefulCloseCommand(Status.UNAVAILABLE
1017
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
1018
            ctx.newPromise());
1✔
1019
      } catch (Exception ex) {
×
1020
        try {
1021
          exceptionCaught(ctx, ex);
×
1022
        } catch (Exception ex2) {
×
1023
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
1024
          logger.log(Level.WARNING, "Original failure", ex);
×
1025
        }
×
1026
      }
1✔
1027
    }
1✔
1028
  }
1029

1030
  private final class GracefulShutdown {
1031
    String goAwayMessage;
1032

1033
    /**
1034
     * The grace time between starting graceful shutdown and closing the netty channel,
1035
     * {@code null} is unspecified.
1036
     */
1037
    @CheckForNull
1038
    Long graceTimeInNanos;
1039

1040
    /**
1041
     * True if ping is Acked or ping is timeout.
1042
     */
1043
    boolean pingAckedOrTimeout;
1044

1045
    Future<?> pingFuture;
1046

1047
    GracefulShutdown(String goAwayMessage,
1048
        @Nullable Long graceTimeInNanos) {
1✔
1049
      this.goAwayMessage = goAwayMessage;
1✔
1050
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1051
    }
1✔
1052

1053
    /**
1054
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1055
     */
1056
    void start(final ChannelHandlerContext ctx) {
1057
      goAway(
1✔
1058
          ctx,
1059
          Integer.MAX_VALUE,
1060
          Http2Error.NO_ERROR.code(),
1✔
1061
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1062
          ctx.newPromise());
1✔
1063

1064
      pingFuture = ctx.executor().schedule(
1✔
1065
          new Runnable() {
1✔
1066
            @Override
1067
            public void run() {
1068
              secondGoAwayAndClose(ctx);
1✔
1069
            }
1✔
1070
          },
1071
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1072
          TimeUnit.NANOSECONDS);
1073

1074
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1075
    }
1✔
1076

1077
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1078
      if (pingAckedOrTimeout) {
1✔
1079
        return;
×
1080
      }
1081
      pingAckedOrTimeout = true;
1✔
1082

1083
      checkNotNull(pingFuture, "pingFuture");
1✔
1084
      pingFuture.cancel(false);
1✔
1085

1086
      // send the second GOAWAY with last stream id
1087
      goAway(
1✔
1088
          ctx,
1089
          connection().remote().lastStreamCreated(),
1✔
1090
          Http2Error.NO_ERROR.code(),
1✔
1091
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1092
          ctx.newPromise());
1✔
1093

1094
      // gracefully shutdown with specified grace time
1095
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1096
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1097
      try {
1098
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1099
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1100
      } catch (Exception e) {
×
1101
        onError(ctx, /* outbound= */ true, e);
×
1102
      } finally {
1103
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1104
      }
1105
    }
1✔
1106

1107
    private long graceTimeOverrideMillis(long originalMillis) {
1108
      if (graceTimeInNanos == null) {
1✔
1109
        return originalMillis;
1✔
1110
      }
1111
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1112
        // netty treats -1 as "no timeout"
1113
        return -1L;
1✔
1114
      }
1115
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1116
    }
1117
  }
1118

1119
  // Use a frame writer so that we know when frames are through flow control and actually being
1120
  // written.
1121
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1122
    private final KeepAliveEnforcer keepAliveEnforcer;
1123

1124
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1125
        KeepAliveEnforcer keepAliveEnforcer) {
1126
      super(delegate);
1✔
1127
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1128
    }
1✔
1129

1130
    @Override
1131
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1132
        int padding, boolean endStream, ChannelPromise promise) {
1133
      keepAliveEnforcer.resetCounters();
1✔
1134
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1135
    }
1136

1137
    @Override
1138
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1139
        int padding, boolean endStream, ChannelPromise promise) {
1140
      keepAliveEnforcer.resetCounters();
1✔
1141
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1142
    }
1143

1144
    @Override
1145
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1146
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1147
        ChannelPromise promise) {
1148
      keepAliveEnforcer.resetCounters();
×
1149
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1150
          padding, endStream, promise);
1151
    }
1152
  }
1153

1154
  private static class ServerChannelLogger extends ChannelLogger {
1155
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1156

1157
    @Override
1158
    public void log(ChannelLogLevel level, String message) {
1159
      log.log(toJavaLogLevel(level), message);
1✔
1160
    }
1✔
1161

1162
    @Override
1163
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1164
      log(level, MessageFormat.format(messageFormat, args));
1✔
1165
    }
1✔
1166
  }
1167

1168
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1169
    switch (level) {
1✔
1170
      case ERROR:
1171
        return Level.FINE;
×
1172
      case WARNING:
1173
        return Level.FINER;
×
1174
      default:
1175
        return Level.FINEST;
1✔
1176
    }
1177
  }
1178
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc