• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19650

17 Jan 2025 09:28AM CUT coverage: 88.568% (-0.02%) from 88.589%
#19650

push

github

web-flow
xds: Envoy proto sync to 2024-11-11 (#11816)

33716 of 38068 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.37
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
64
import io.netty.handler.codec.http2.DefaultHttp2Connection;
65
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
66
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
67
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
68
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
69
import io.netty.handler.codec.http2.DefaultHttp2Headers;
70
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
71
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
72
import io.netty.handler.codec.http2.Http2Connection;
73
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
74
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
75
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
76
import io.netty.handler.codec.http2.Http2Error;
77
import io.netty.handler.codec.http2.Http2Exception;
78
import io.netty.handler.codec.http2.Http2Exception.StreamException;
79
import io.netty.handler.codec.http2.Http2FrameAdapter;
80
import io.netty.handler.codec.http2.Http2FrameLogger;
81
import io.netty.handler.codec.http2.Http2FrameReader;
82
import io.netty.handler.codec.http2.Http2FrameWriter;
83
import io.netty.handler.codec.http2.Http2Headers;
84
import io.netty.handler.codec.http2.Http2HeadersDecoder;
85
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
86
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
87
import io.netty.handler.codec.http2.Http2Settings;
88
import io.netty.handler.codec.http2.Http2Stream;
89
import io.netty.handler.codec.http2.Http2StreamVisitor;
90
import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
91
import io.netty.handler.logging.LogLevel;
92
import io.netty.util.AsciiString;
93
import io.netty.util.ReferenceCountUtil;
94
import io.perfmark.PerfMark;
95
import io.perfmark.Tag;
96
import io.perfmark.TaskCloseable;
97
import java.text.MessageFormat;
98
import java.util.List;
99
import java.util.concurrent.Future;
100
import java.util.concurrent.ScheduledFuture;
101
import java.util.concurrent.TimeUnit;
102
import java.util.logging.Level;
103
import java.util.logging.Logger;
104
import javax.annotation.CheckForNull;
105
import javax.annotation.Nullable;
106

107
/**
108
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
109
 * the context of the Netty Channel thread.
110
 */
111
class NettyServerHandler extends AbstractNettyHandler {
112
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
113
  private static final long KEEPALIVE_PING = 0xDEADL;
114
  @VisibleForTesting
115
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
116
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
117
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
118
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
119
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
120

121
  private final Http2Connection.PropertyKey streamKey;
122
  private final ServerTransportListener transportListener;
123
  private final int maxMessageSize;
124
  private final long keepAliveTimeInNanos;
125
  private final long keepAliveTimeoutInNanos;
126
  private final long maxConnectionAgeInNanos;
127
  private final long maxConnectionAgeGraceInNanos;
128
  private final int maxRstCount;
129
  private final long maxRstPeriodNanos;
130
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
131
  private final TransportTracer transportTracer;
132
  private final KeepAliveEnforcer keepAliveEnforcer;
133
  private final Attributes eagAttributes;
134
  private final Ticker ticker;
135
  /** Incomplete attributes produced by negotiator. */
136
  private Attributes negotiationAttributes;
137
  private InternalChannelz.Security securityInfo;
138
  /** Completed attributes produced by transportReady. */
139
  private Attributes attributes;
140
  private Throwable connectionError;
141
  private boolean teWarningLogged;
142
  private WriteQueue serverWriteQueue;
143
  private AsciiString lastKnownAuthority;
144
  @CheckForNull
145
  private KeepAliveManager keepAliveManager;
146
  @CheckForNull
147
  private MaxConnectionIdleManager maxConnectionIdleManager;
148
  @CheckForNull
149
  private ScheduledFuture<?> maxConnectionAgeMonitor;
150
  @CheckForNull
151
  private GracefulShutdown gracefulShutdown;
152
  private int rstCount;
153
  private long lastRstNanoTime;
154

155
  static NettyServerHandler newHandler(
156
      ServerTransportListener transportListener,
157
      ChannelPromise channelUnused,
158
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
159
      TransportTracer transportTracer,
160
      int maxStreams,
161
      boolean autoFlowControl,
162
      int flowControlWindow,
163
      int maxHeaderListSize,
164
      int softLimitHeaderListSize,
165
      int maxMessageSize,
166
      long keepAliveTimeInNanos,
167
      long keepAliveTimeoutInNanos,
168
      long maxConnectionIdleInNanos,
169
      long maxConnectionAgeInNanos,
170
      long maxConnectionAgeGraceInNanos,
171
      boolean permitKeepAliveWithoutCalls,
172
      long permitKeepAliveTimeInNanos,
173
      int maxRstCount,
174
      long maxRstPeriodNanos,
175
      Attributes eagAttributes) {
176
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
177
        maxHeaderListSize);
178
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
179
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
180
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
181
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
182
    Http2FrameWriter frameWriter =
1✔
183
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
184
    return newHandler(
1✔
185
        channelUnused,
186
        frameReader,
187
        frameWriter,
188
        transportListener,
189
        streamTracerFactories,
190
        transportTracer,
191
        maxStreams,
192
        autoFlowControl,
193
        flowControlWindow,
194
        maxHeaderListSize,
195
        softLimitHeaderListSize,
196
        maxMessageSize,
197
        keepAliveTimeInNanos,
198
        keepAliveTimeoutInNanos,
199
        maxConnectionIdleInNanos,
200
        maxConnectionAgeInNanos,
201
        maxConnectionAgeGraceInNanos,
202
        permitKeepAliveWithoutCalls,
203
        permitKeepAliveTimeInNanos,
204
        maxRstCount,
205
        maxRstPeriodNanos,
206
        eagAttributes,
207
        Ticker.systemTicker());
1✔
208
  }
209

210
  static NettyServerHandler newHandler(
211
      ChannelPromise channelUnused,
212
      Http2FrameReader frameReader,
213
      Http2FrameWriter frameWriter,
214
      ServerTransportListener transportListener,
215
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
216
      TransportTracer transportTracer,
217
      int maxStreams,
218
      boolean autoFlowControl,
219
      int flowControlWindow,
220
      int maxHeaderListSize,
221
      int softLimitHeaderListSize,
222
      int maxMessageSize,
223
      long keepAliveTimeInNanos,
224
      long keepAliveTimeoutInNanos,
225
      long maxConnectionIdleInNanos,
226
      long maxConnectionAgeInNanos,
227
      long maxConnectionAgeGraceInNanos,
228
      boolean permitKeepAliveWithoutCalls,
229
      long permitKeepAliveTimeInNanos,
230
      int maxRstCount,
231
      long maxRstPeriodNanos,
232
      Attributes eagAttributes,
233
      Ticker ticker) {
234
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
235
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
236
        flowControlWindow);
237
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
238
        maxHeaderListSize);
239
    Preconditions.checkArgument(
1✔
240
        softLimitHeaderListSize > 0, "softLimitHeaderListSize must be positive: %s",
241
        softLimitHeaderListSize);
242
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
243
        maxMessageSize);
244

245
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
246
    WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
1✔
247
    dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
1✔
248
    DefaultHttp2RemoteFlowController controller =
1✔
249
        new DefaultHttp2RemoteFlowController(connection, dist);
250
    connection.remote().flowController(controller);
1✔
251
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
252
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
253

254
    // Create the local flow controller configured to auto-refill the connection window.
255
    connection.local().flowController(
1✔
256
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
257
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
258
    Http2ConnectionEncoder encoder =
1✔
259
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
260
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
261
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
262
        frameReader);
263

264
    Http2Settings settings = new Http2Settings();
1✔
265
    settings.initialWindowSize(flowControlWindow);
1✔
266
    settings.maxConcurrentStreams(maxStreams);
1✔
267
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
268

269
    if (ticker == null) {
1✔
270
      ticker = Ticker.systemTicker();
×
271
    }
272

273
    return new NettyServerHandler(
1✔
274
        channelUnused,
275
        connection,
276
        transportListener,
277
        streamTracerFactories,
278
        transportTracer,
279
        decoder, encoder, settings,
280
        maxMessageSize,
281
        maxHeaderListSize,
282
        softLimitHeaderListSize,
283
        keepAliveTimeInNanos,
284
        keepAliveTimeoutInNanos,
285
        maxConnectionIdleInNanos,
286
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
287
        keepAliveEnforcer,
288
        autoFlowControl,
289
        maxRstCount,
290
        maxRstPeriodNanos,
291
        eagAttributes, ticker);
292
  }
293

294
  private NettyServerHandler(
295
      ChannelPromise channelUnused,
296
      final Http2Connection connection,
297
      ServerTransportListener transportListener,
298
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
299
      TransportTracer transportTracer,
300
      Http2ConnectionDecoder decoder,
301
      Http2ConnectionEncoder encoder,
302
      Http2Settings settings,
303
      int maxMessageSize,
304
      int maxHeaderListSize,
305
      int softLimitHeaderListSize,
306
      long keepAliveTimeInNanos,
307
      long keepAliveTimeoutInNanos,
308
      long maxConnectionIdleInNanos,
309
      long maxConnectionAgeInNanos,
310
      long maxConnectionAgeGraceInNanos,
311
      final KeepAliveEnforcer keepAliveEnforcer,
312
      boolean autoFlowControl,
313
      int maxRstCount,
314
      long maxRstPeriodNanos,
315
      Attributes eagAttributes,
316
      Ticker ticker) {
317
    super(
1✔
318
        channelUnused,
319
        decoder,
320
        encoder,
321
        settings,
322
        new ServerChannelLogger(),
323
        autoFlowControl,
324
        null,
325
        ticker,
326
        maxHeaderListSize,
327
        softLimitHeaderListSize);
328

329
    final MaxConnectionIdleManager maxConnectionIdleManager;
330
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
331
      maxConnectionIdleManager = null;
1✔
332
    } else {
333
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
334
    }
335

336
    connection.addListener(new Http2ConnectionAdapter() {
1✔
337
      @Override
338
      public void onStreamActive(Http2Stream stream) {
339
        if (connection.numActiveStreams() == 1) {
1✔
340
          keepAliveEnforcer.onTransportActive();
1✔
341
          if (maxConnectionIdleManager != null) {
1✔
342
            maxConnectionIdleManager.onTransportActive();
1✔
343
          }
344
        }
345
      }
1✔
346

347
      @Override
348
      public void onStreamClosed(Http2Stream stream) {
349
        if (connection.numActiveStreams() == 0) {
1✔
350
          keepAliveEnforcer.onTransportIdle();
1✔
351
          if (maxConnectionIdleManager != null) {
1✔
352
            maxConnectionIdleManager.onTransportIdle();
1✔
353
          }
354
        }
355
      }
1✔
356
    });
357

358
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
359
    this.maxMessageSize = maxMessageSize;
1✔
360
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
361
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
362
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
363
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
364
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
365
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
366
    this.maxRstCount = maxRstCount;
1✔
367
    this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
368
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
369
    this.ticker = checkNotNull(ticker, "ticker");
1✔
370

371
    this.lastRstNanoTime = ticker.read();
1✔
372
    streamKey = encoder.connection().newKey();
1✔
373
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
374
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
375
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
376
    // Set the frame listener on the decoder.
377
    decoder().frameListener(new FrameListener());
1✔
378
  }
1✔
379

380
  @Nullable
381
  Throwable connectionError() {
382
    return connectionError;
1✔
383
  }
384

385
  @Override
386
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
387
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
388

389
    // init max connection age monitor
390
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
391
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
392
          new LogExceptionRunnable(new Runnable() {
1✔
393
            @Override
394
            public void run() {
395
              if (gracefulShutdown == null) {
1✔
396
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
397
                gracefulShutdown.start(ctx);
1✔
398
                ctx.flush();
1✔
399
              }
400
            }
1✔
401
          }),
402
          maxConnectionAgeInNanos,
403
          TimeUnit.NANOSECONDS);
404
    }
405

406
    if (maxConnectionIdleManager != null) {
1✔
407
      maxConnectionIdleManager.start(new Runnable() {
1✔
408
        @Override
409
        public void run() {
410
          if (gracefulShutdown == null) {
1✔
411
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
412
            gracefulShutdown.start(ctx);
1✔
413
            ctx.flush();
1✔
414
          }
415
        }
1✔
416
      }, ctx.executor());
1✔
417
    }
418

419
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
420
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
421
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
422
      keepAliveManager.onTransportStarted();
1✔
423
    }
424

425
    assert encoder().connection().equals(decoder().connection());
1✔
426
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
427

428
    super.handlerAdded(ctx);
1✔
429
  }
1✔
430

431
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
432
      throws Http2Exception {
433
    try {
434
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
435
      // by Netty. RFC 7540 section 8.1.2.2
436
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
437
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
438
        return;
×
439
      }
440

441
      if (headers.authority() == null) {
1✔
442
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
443
        if (hosts.size() > 1) {
1✔
444
          // RFC 7230 section 5.4
445
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
446
              "Multiple host headers");
447
          return;
1✔
448
        }
449
        if (!hosts.isEmpty()) {
1✔
450
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
451
        }
452
      }
453
      headers.remove(HOST);
1✔
454

455
      // Remove the leading slash of the path and get the fully qualified method name
456
      CharSequence path = headers.path();
1✔
457

458
      if (path == null) {
1✔
459
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
460
            "Expected path but is missing");
461
        return;
1✔
462
      }
463

464
      if (path.charAt(0) != '/') {
1✔
465
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
466
            String.format("Expected path to start with /: %s", path));
1✔
467
        return;
1✔
468
      }
469

470
      String method = path.subSequence(1, path.length()).toString();
1✔
471

472
      // Verify that the Content-Type is correct in the request.
473
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
474
      if (contentType == null) {
1✔
475
        respondWithHttpError(
1✔
476
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
477
        return;
1✔
478
      }
479
      String contentTypeString = contentType.toString();
1✔
480
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
481
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
482
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
483
        return;
1✔
484
      }
485

486
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
487
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
488
            String.format("Method '%s' is not supported", headers.method()));
1✔
489
        return;
1✔
490
      }
491

492
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
493
      if (Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
494
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize)) {
495
        respondWithHttpError(ctx, streamId, 431, Status.Code.RESOURCE_EXHAUSTED, String.format(
×
496
                "Client Headers of size %d exceeded Metadata size soft limit: %d",
497
                h2HeadersSize,
×
498
                softLimitHeaderListSize));
×
499
        return;
×
500
      }
501

502
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
503
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
504
                + "some intermediate proxy may not support trailers",
505
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
506
        teWarningLogged = true;
1✔
507
      }
508

509
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
510
      // method.
511
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
512

513
      Metadata metadata = Utils.convertHeaders(headers);
1✔
514
      StatsTraceContext statsTraceCtx =
1✔
515
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
516

517
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
518
          this,
519
          ctx.channel().eventLoop(),
1✔
520
          http2Stream,
521
          maxMessageSize,
522
          statsTraceCtx,
523
          transportTracer,
524
          method);
525

526
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
527
        PerfMark.attachTag(state.tag());
1✔
528
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
529
        NettyServerStream stream = new NettyServerStream(
1✔
530
            ctx.channel(),
1✔
531
            state,
532
            attributes,
533
            authority,
534
            statsTraceCtx);
535
        transportListener.streamCreated(stream, method, metadata);
1✔
536
        state.onStreamAllocated();
1✔
537
        http2Stream.setProperty(streamKey, state);
1✔
538
      }
539
    } catch (Exception e) {
×
540
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
541
      // Throw an exception that will get handled by onStreamError.
542
      throw newStreamException(streamId, e);
×
543
    }
1✔
544
  }
1✔
545

546
  private String getOrUpdateAuthority(AsciiString authority) {
547
    if (authority == null) {
1✔
548
      return null;
1✔
549
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
550
      lastKnownAuthority = authority;
1✔
551
    }
552

553
    // AsciiString.toString() is internally cached, so subsequent calls will not
554
    // result in recomputing the String representation of lastKnownAuthority.
555
    return lastKnownAuthority.toString();
1✔
556
  }
557

558
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
559
      throws Http2Exception {
560
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
561
    try {
562
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
563
      if (stream == null) {
1✔
564
        return;
1✔
565
      }
566
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
567
        PerfMark.attachTag(stream.tag());
1✔
568
        stream.inboundDataReceived(data, endOfStream);
1✔
569
      }
570
    } catch (Throwable e) {
×
571
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
572
      // Throw an exception that will get handled by onStreamError.
573
      throw newStreamException(streamId, e);
×
574
    }
1✔
575
  }
1✔
576

577
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
578
    if (maxRstCount > 0) {
1✔
579
      long now = ticker.read();
1✔
580
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
581
        lastRstNanoTime = now;
1✔
582
        rstCount = 1;
1✔
583
      } else {
584
        rstCount++;
1✔
585
        if (rstCount > maxRstCount) {
1✔
586
          throw new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
587
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
588
            @Override
589
            public Throwable fillInStackTrace() {
590
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
591
              return this;
1✔
592
            }
593
          };
594
        }
595
      }
596
    }
597

598
    try {
599
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
600
      if (stream != null) {
1✔
601
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
602
          PerfMark.attachTag(stream.tag());
1✔
603
          stream.transportReportStatus(
1✔
604
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
605
        }
606
      }
607
    } catch (Throwable e) {
1✔
608
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
1✔
609
      // Throw an exception that will get handled by onStreamError.
610
      throw newStreamException(streamId, e);
1✔
611
    }
1✔
612
  }
1✔
613

614
  @Override
615
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
616
      Http2Exception http2Ex) {
617
    logger.log(Level.FINE, "Connection Error", cause);
1✔
618
    connectionError = cause;
1✔
619
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
620
  }
1✔
621

622
  @Override
623
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
624
      StreamException http2Ex) {
625
    NettyServerStream.TransportState serverStream = serverStream(
1✔
626
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
627
    Level level = Level.WARNING;
1✔
628
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
629
      level = Level.FINE;
×
630
    }
631
    logger.log(level, "Stream Error", cause);
1✔
632
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
633
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
634
      PerfMark.attachTag(tag);
1✔
635
      if (serverStream != null) {
1✔
636
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
637
      }
638
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
639
      // Delegate to the base class to send a RST_STREAM.
640
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
641
    }
642
  }
1✔
643

644
  @Override
645
  public void handleProtocolNegotiationCompleted(
646
      Attributes attrs, InternalChannelz.Security securityInfo) {
647
    negotiationAttributes = attrs;
1✔
648
    this.securityInfo = securityInfo;
1✔
649
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
650
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
651
  }
1✔
652

653
  @Override
654
  public Attributes getEagAttributes() {
655
    return eagAttributes;
1✔
656
  }
657

658
  InternalChannelz.Security getSecurityInfo() {
659
    return securityInfo;
1✔
660
  }
661

662
  @VisibleForTesting
663
  KeepAliveManager getKeepAliveManagerForTest() {
664
    return keepAliveManager;
1✔
665
  }
666

667
  @VisibleForTesting
668
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
669
    this.keepAliveManager = keepAliveManager;
1✔
670
  }
1✔
671

672
  /**
673
   * Handler for the Channel shutting down.
674
   */
675
  @Override
676
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
677
    try {
678
      if (keepAliveManager != null) {
1✔
679
        keepAliveManager.onTransportTermination();
1✔
680
      }
681
      if (maxConnectionIdleManager != null) {
1✔
682
        maxConnectionIdleManager.onTransportTermination();
1✔
683
      }
684
      if (maxConnectionAgeMonitor != null) {
1✔
685
        maxConnectionAgeMonitor.cancel(false);
1✔
686
      }
687
      final Status status =
1✔
688
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
689
      // Any streams that are still active must be closed
690
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
691
        @Override
692
        public boolean visit(Http2Stream stream) throws Http2Exception {
693
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
694
          if (serverStream != null) {
1✔
695
            serverStream.transportReportStatus(status);
1✔
696
          }
697
          return true;
1✔
698
        }
699
      });
700
    } finally {
701
      super.channelInactive(ctx);
1✔
702
    }
703
  }
1✔
704

705
  WriteQueue getWriteQueue() {
706
    return serverWriteQueue;
1✔
707
  }
708

709
  /** Handler for commands sent from the stream. */
710
  @Override
711
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
712
      throws Exception {
713
    if (msg instanceof SendGrpcFrameCommand) {
1✔
714
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
715
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
716
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
717
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
718
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
719
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
720
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
721
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
722
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
723
    } else {
724
      AssertionError e =
×
725
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
726
      ReferenceCountUtil.release(msg);
×
727
      promise.setFailure(e);
×
728
      throw e;
×
729
    }
730
  }
1✔
731

732
  @Override
733
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
734
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
735
    ctx.flush();
1✔
736
  }
1✔
737

738
  /**
739
   * Returns the given processed bytes back to inbound flow control.
740
   */
741
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
742
    try {
743
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
744
    } catch (Http2Exception e) {
×
745
      throw new RuntimeException(e);
×
746
    }
1✔
747
  }
1✔
748

749
  private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
750
    promise.addListener(
1✔
751
        new ChannelFutureListener() {
1✔
752
          @Override
753
          public void operationComplete(ChannelFuture future) {
754
            serverStream(stream).complete();
1✔
755
          }
1✔
756
        });
757
  }
1✔
758

759
  private static void streamGone(int streamId, ChannelPromise promise) {
760
    promise.setFailure(
1✔
761
        new IllegalStateException(
762
            "attempting to write to stream " + streamId + " that no longer exists") {
1✔
763
          @Override
764
          public synchronized Throwable fillInStackTrace() {
765
            return this;
1✔
766
          }
767
        });
768
  }
1✔
769

770
  /** Sends the given gRPC frame to the client. */
771
  private void sendGrpcFrame(
772
      ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
773
      throws Http2Exception {
774
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
775
      PerfMark.attachTag(cmd.stream().tag());
1✔
776
      PerfMark.linkIn(cmd.getLink());
1✔
777
      int streamId = cmd.stream().id();
1✔
778
      Http2Stream stream = connection().stream(streamId);
1✔
779
      if (stream == null) {
1✔
780
        cmd.release();
1✔
781
        streamGone(streamId, promise);
1✔
782
        return;
1✔
783
      }
784
      if (cmd.endStream()) {
1✔
785
        closeStreamWhenDone(promise, stream);
×
786
      }
787
      // Call the base class to write the HTTP/2 DATA frame.
788
      encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
1✔
789
    }
790
  }
1✔
791

792
  /**
793
   * Sends the response headers to the client.
794
   */
795
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
796
      ChannelPromise promise) throws Http2Exception {
797
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
798
      PerfMark.attachTag(cmd.stream().tag());
1✔
799
      PerfMark.linkIn(cmd.getLink());
1✔
800
      int streamId = cmd.stream().id();
1✔
801
      Http2Stream stream = connection().stream(streamId);
1✔
802
      if (stream == null) {
1✔
803
        streamGone(streamId, promise);
1✔
804
        return;
1✔
805
      }
806
      if (cmd.endOfStream()) {
1✔
807
        closeStreamWhenDone(promise, stream);
1✔
808
      }
809
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
810
    }
811
  }
1✔
812

813
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
814
      ChannelPromise promise) {
815
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
816
      PerfMark.attachTag(cmd.stream().tag());
1✔
817
      PerfMark.linkIn(cmd.getLink());
1✔
818
      // Notify the listener if we haven't already.
819
      cmd.stream().transportReportStatus(cmd.reason());
1✔
820

821
      // Now we need to decide how we're going to notify the peer that this stream is closed.
822
      // If possible, it's nice to inform the peer _why_ this stream was cancelled by sending
823
      // a structured headers frame.
824
      if (shouldCloseStreamWithHeaders(cmd, connection())) {
1✔
825
        Metadata md = new Metadata();
1✔
826
        md.put(InternalStatus.CODE_KEY, cmd.reason());
1✔
827
        if (cmd.reason().getDescription() != null) {
1✔
828
          md.put(InternalStatus.MESSAGE_KEY, cmd.reason().getDescription());
1✔
829
        }
830
        Http2Headers headers = Utils.convertServerHeaders(md);
1✔
831
        encoder().writeHeaders(
1✔
832
            ctx, cmd.stream().id(), headers, /* padding = */ 0, /* endStream = */ true, promise);
1✔
833
      } else {
1✔
834
        // Terminate the stream.
835
        encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
836
      }
837
    }
838
  }
1✔
839

840
  // Determine whether a CancelServerStreamCommand should try to close the stream with a
841
  // HEADERS or a RST_STREAM frame. The caller has some influence over this (they can
842
  // configure cmd.wantsHeaders()). The state of the stream also has an influence: we
843
  // only try to send HEADERS if the stream exists and hasn't already sent any headers.
844
  private static boolean shouldCloseStreamWithHeaders(
845
          CancelServerStreamCommand cmd, Http2Connection conn) {
846
    if (!cmd.wantsHeaders()) {
1✔
847
      return false;
1✔
848
    }
849
    Http2Stream stream = conn.stream(cmd.stream().id());
1✔
850
    return stream != null && !stream.isHeadersSent();
1✔
851
  }
852

853
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
854
      ChannelPromise promise) throws Exception {
855
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
856
    // requested here. But that's an edge case and seems bug-prone.
857
    if (gracefulShutdown == null) {
1✔
858
      Long graceTimeInNanos = null;
1✔
859
      if (msg.getGraceTimeUnit() != null) {
1✔
860
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
861
      }
862
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
863
      gracefulShutdown.start(ctx);
1✔
864
    }
865
    promise.setSuccess();
1✔
866
  }
1✔
867

868
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
869
      ChannelPromise promise) throws Exception {
870
    super.close(ctx, promise);
1✔
871
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
872
      @Override
873
      public boolean visit(Http2Stream stream) throws Http2Exception {
874
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
875
        if (serverStream != null) {
1✔
876
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
877
            PerfMark.attachTag(serverStream.tag());
1✔
878
            PerfMark.linkIn(msg.getLink());
1✔
879
            serverStream.transportReportStatus(msg.getStatus());
1✔
880
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
881
          }
882
        }
883
        stream.close();
1✔
884
        return true;
1✔
885
      }
886
    });
887
  }
1✔
888

889
  private void respondWithHttpError(
890
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
891
    Metadata metadata = new Metadata();
1✔
892
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
893
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
894
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
895

896
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
897
        .status("" + code)
1✔
898
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
899
    for (int i = 0; i < serialized.length; i += 2) {
1✔
900
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
901
    }
902
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
903
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
904
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
905
  }
1✔
906

907
  private Http2Stream requireHttp2Stream(int streamId) {
908
    Http2Stream stream = connection().stream(streamId);
1✔
909
    if (stream == null) {
1✔
910
      // This should never happen.
911
      throw new AssertionError("Stream does not exist: " + streamId);
×
912
    }
913
    return stream;
1✔
914
  }
915

916
  /**
917
   * Returns the server stream associated to the given HTTP/2 stream object.
918
   */
919
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
920
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
921
  }
922

923
  private Http2Exception newStreamException(int streamId, Throwable cause) {
924
    return Http2Exception.streamError(
1✔
925
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
1✔
926
  }
927

928
  private class FrameListener extends Http2FrameAdapter {
1✔
929
    private boolean firstSettings = true;
1✔
930

931
    @Override
932
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
933
      if (firstSettings) {
1✔
934
        firstSettings = false;
1✔
935
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
936
        // handshakeTimeout
937
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
938
      }
939
    }
1✔
940

941
    @Override
942
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
943
        boolean endOfStream) throws Http2Exception {
944
      if (keepAliveManager != null) {
1✔
945
        keepAliveManager.onDataReceived();
1✔
946
      }
947
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
948
      return padding;
1✔
949
    }
950

951
    @Override
952
    public void onHeadersRead(ChannelHandlerContext ctx,
953
        int streamId,
954
        Http2Headers headers,
955
        int streamDependency,
956
        short weight,
957
        boolean exclusive,
958
        int padding,
959
        boolean endStream) throws Http2Exception {
960
      if (keepAliveManager != null) {
1✔
961
        keepAliveManager.onDataReceived();
1✔
962
      }
963
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
964
      if (endStream) {
1✔
965
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
966
      }
967
    }
1✔
968

969
    @Override
970
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
971
        throws Http2Exception {
972
      if (keepAliveManager != null) {
1✔
973
        keepAliveManager.onDataReceived();
1✔
974
      }
975
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
976
    }
1✔
977

978
    @Override
979
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
980
      if (keepAliveManager != null) {
1✔
981
        keepAliveManager.onDataReceived();
1✔
982
      }
983
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
984
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
985
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
986
            debugData, ctx.newPromise());
1✔
987
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
988
        try {
989
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
990
        } catch (Exception ex) {
×
991
          onError(ctx, /* outbound= */ true, ex);
×
992
        }
1✔
993
      }
994
    }
1✔
995

996
    @Override
997
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
998
      if (keepAliveManager != null) {
1✔
999
        keepAliveManager.onDataReceived();
1✔
1000
      }
1001
      if (data == flowControlPing().payload()) {
1✔
1002
        flowControlPing().updateWindow();
1✔
1003
        logger.log(Level.FINE, "Window: {0}",
1✔
1004
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1005
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
1006
        if (gracefulShutdown == null) {
1✔
1007
          // this should never happen
1008
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
1009
        } else {
1010
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
1011
        }
1012
      } else if (data != KEEPALIVE_PING) {
1✔
1013
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1014
      }
1015
    }
1✔
1016
  }
1017

1018
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1019
    final ChannelHandlerContext ctx;
1020

1021
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
1022
      this.ctx = ctx;
1✔
1023
    }
1✔
1024

1025
    @Override
1026
    public void ping() {
1027
      ChannelFuture pingFuture = encoder().writePing(
1✔
1028
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
1029
      ctx.flush();
1✔
1030
      pingFuture.addListener(new ChannelFutureListener() {
1✔
1031
        @Override
1032
        public void operationComplete(ChannelFuture future) throws Exception {
1033
          if (future.isSuccess()) {
1✔
1034
            transportTracer.reportKeepAliveSent();
1✔
1035
          }
1036
        }
1✔
1037
      });
1038
    }
1✔
1039

1040
    @Override
1041
    public void onPingTimeout() {
1042
      try {
1043
        forcefulClose(
1✔
1044
            ctx,
1045
            new ForcefulCloseCommand(Status.UNAVAILABLE
1046
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
1047
            ctx.newPromise());
1✔
1048
      } catch (Exception ex) {
×
1049
        try {
1050
          exceptionCaught(ctx, ex);
×
1051
        } catch (Exception ex2) {
×
1052
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
1053
          logger.log(Level.WARNING, "Original failure", ex);
×
1054
        }
×
1055
      }
1✔
1056
    }
1✔
1057
  }
1058

1059
  private final class GracefulShutdown {
1060
    String goAwayMessage;
1061

1062
    /**
1063
     * The grace time between starting graceful shutdown and closing the netty channel,
1064
     * {@code null} is unspecified.
1065
     */
1066
    @CheckForNull
1067
    Long graceTimeInNanos;
1068

1069
    /**
1070
     * True if ping is Acked or ping is timeout.
1071
     */
1072
    boolean pingAckedOrTimeout;
1073

1074
    Future<?> pingFuture;
1075

1076
    GracefulShutdown(String goAwayMessage,
1077
        @Nullable Long graceTimeInNanos) {
1✔
1078
      this.goAwayMessage = goAwayMessage;
1✔
1079
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1080
    }
1✔
1081

1082
    /**
1083
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1084
     */
1085
    void start(final ChannelHandlerContext ctx) {
1086
      goAway(
1✔
1087
          ctx,
1088
          Integer.MAX_VALUE,
1089
          Http2Error.NO_ERROR.code(),
1✔
1090
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1091
          ctx.newPromise());
1✔
1092

1093
      pingFuture = ctx.executor().schedule(
1✔
1094
          new Runnable() {
1✔
1095
            @Override
1096
            public void run() {
1097
              secondGoAwayAndClose(ctx);
1✔
1098
            }
1✔
1099
          },
1100
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1101
          TimeUnit.NANOSECONDS);
1102

1103
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1104
    }
1✔
1105

1106
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1107
      if (pingAckedOrTimeout) {
1✔
1108
        return;
×
1109
      }
1110
      pingAckedOrTimeout = true;
1✔
1111

1112
      checkNotNull(pingFuture, "pingFuture");
1✔
1113
      pingFuture.cancel(false);
1✔
1114

1115
      // send the second GOAWAY with last stream id
1116
      goAway(
1✔
1117
          ctx,
1118
          connection().remote().lastStreamCreated(),
1✔
1119
          Http2Error.NO_ERROR.code(),
1✔
1120
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1121
          ctx.newPromise());
1✔
1122

1123
      // gracefully shutdown with specified grace time
1124
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1125
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1126
      try {
1127
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1128
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1129
      } catch (Exception e) {
×
1130
        onError(ctx, /* outbound= */ true, e);
×
1131
      } finally {
1132
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1133
      }
1134
    }
1✔
1135

1136
    private long graceTimeOverrideMillis(long originalMillis) {
1137
      if (graceTimeInNanos == null) {
1✔
1138
        return originalMillis;
1✔
1139
      }
1140
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1141
        // netty treats -1 as "no timeout"
1142
        return -1L;
1✔
1143
      }
1144
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1145
    }
1146
  }
1147

1148
  // Use a frame writer so that we know when frames are through flow control and actually being
1149
  // written.
1150
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1151
    private final KeepAliveEnforcer keepAliveEnforcer;
1152

1153
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1154
        KeepAliveEnforcer keepAliveEnforcer) {
1155
      super(delegate);
1✔
1156
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1157
    }
1✔
1158

1159
    @Override
1160
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1161
        int padding, boolean endStream, ChannelPromise promise) {
1162
      keepAliveEnforcer.resetCounters();
1✔
1163
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1164
    }
1165

1166
    @Override
1167
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1168
        int padding, boolean endStream, ChannelPromise promise) {
1169
      keepAliveEnforcer.resetCounters();
1✔
1170
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1171
    }
1172

1173
    @Override
1174
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1175
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1176
        ChannelPromise promise) {
1177
      keepAliveEnforcer.resetCounters();
×
1178
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1179
          padding, endStream, promise);
1180
    }
1181
  }
1182

1183
  private static class ServerChannelLogger extends ChannelLogger {
1184
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1185

1186
    @Override
1187
    public void log(ChannelLogLevel level, String message) {
1188
      log.log(toJavaLogLevel(level), message);
1✔
1189
    }
1✔
1190

1191
    @Override
1192
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1193
      log(level, MessageFormat.format(messageFormat, args));
1✔
1194
    }
1✔
1195
  }
1196

1197
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1198
    switch (level) {
1✔
1199
      case ERROR:
1200
        return Level.FINE;
×
1201
      case WARNING:
1202
        return Level.FINER;
×
1203
      default:
1204
        return Level.FINEST;
1✔
1205
    }
1206
  }
1207
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc