• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19990

19 Sep 2025 03:23PM UTC coverage: 88.547% (+0.04%) from 88.511%
#19990

push

github

ejona86
xds: Remove verify TODO for onResult2 error status

This had been accidentally left in 0c179e3f9.

Requesting a refresh is pretty close to RetryingNameResolver's behavior
of exponential backoff. While not identical, it is the closest we can
get easily.

34668 of 39152 relevant lines covered (88.55%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.93
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http.HttpHeaderNames;
64
import io.netty.handler.codec.http2.DecoratingHttp2ConnectionEncoder;
65
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
66
import io.netty.handler.codec.http2.DefaultHttp2Connection;
67
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
68
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
69
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
70
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
71
import io.netty.handler.codec.http2.DefaultHttp2Headers;
72
import io.netty.handler.codec.http2.DefaultHttp2HeadersEncoder;
73
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
74
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
75
import io.netty.handler.codec.http2.EmptyHttp2Headers;
76
import io.netty.handler.codec.http2.Http2Connection;
77
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
78
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
79
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
80
import io.netty.handler.codec.http2.Http2Error;
81
import io.netty.handler.codec.http2.Http2Exception;
82
import io.netty.handler.codec.http2.Http2Exception.StreamException;
83
import io.netty.handler.codec.http2.Http2FrameAdapter;
84
import io.netty.handler.codec.http2.Http2FrameLogger;
85
import io.netty.handler.codec.http2.Http2FrameReader;
86
import io.netty.handler.codec.http2.Http2FrameWriter;
87
import io.netty.handler.codec.http2.Http2Headers;
88
import io.netty.handler.codec.http2.Http2HeadersDecoder;
89
import io.netty.handler.codec.http2.Http2HeadersEncoder;
90
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
91
import io.netty.handler.codec.http2.Http2LifecycleManager;
92
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
93
import io.netty.handler.codec.http2.Http2Settings;
94
import io.netty.handler.codec.http2.Http2Stream;
95
import io.netty.handler.codec.http2.Http2StreamVisitor;
96
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
97
import io.netty.handler.logging.LogLevel;
98
import io.netty.util.AsciiString;
99
import io.netty.util.ReferenceCountUtil;
100
import io.perfmark.PerfMark;
101
import io.perfmark.Tag;
102
import io.perfmark.TaskCloseable;
103
import java.text.MessageFormat;
104
import java.util.List;
105
import java.util.concurrent.Future;
106
import java.util.concurrent.ScheduledFuture;
107
import java.util.concurrent.TimeUnit;
108
import java.util.logging.Level;
109
import java.util.logging.Logger;
110
import javax.annotation.CheckForNull;
111
import javax.annotation.Nullable;
112

113
/**
114
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
115
 * the context of the Netty Channel thread.
116
 */
117
class NettyServerHandler extends AbstractNettyHandler {
118
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
119
  private static final long KEEPALIVE_PING = 0xDEADL;
120
  @VisibleForTesting
121
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
122
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
123
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
124
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
125
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
126

127
  private final Http2Connection.PropertyKey streamKey;
128
  private final ServerTransportListener transportListener;
129
  private final int maxMessageSize;
130
  private final long keepAliveTimeInNanos;
131
  private final long keepAliveTimeoutInNanos;
132
  private final long maxConnectionAgeInNanos;
133
  private final long maxConnectionAgeGraceInNanos;
134
  private final RstStreamCounter rstStreamCounter;
135
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
136
  private final TransportTracer transportTracer;
137
  private final KeepAliveEnforcer keepAliveEnforcer;
138
  private final Attributes eagAttributes;
139
  /** Incomplete attributes produced by negotiator. */
140
  private Attributes negotiationAttributes;
141
  private InternalChannelz.Security securityInfo;
142
  /** Completed attributes produced by transportReady. */
143
  private Attributes attributes;
144
  private Throwable connectionError;
145
  private boolean teWarningLogged;
146
  private WriteQueue serverWriteQueue;
147
  private AsciiString lastKnownAuthority;
148
  @CheckForNull
149
  private KeepAliveManager keepAliveManager;
150
  @CheckForNull
151
  private MaxConnectionIdleManager maxConnectionIdleManager;
152
  @CheckForNull
153
  private ScheduledFuture<?> maxConnectionAgeMonitor;
154
  @CheckForNull
155
  private GracefulShutdown gracefulShutdown;
156

157
  static NettyServerHandler newHandler(
158
      ServerTransportListener transportListener,
159
      ChannelPromise channelUnused,
160
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
161
      TransportTracer transportTracer,
162
      int maxStreams,
163
      boolean autoFlowControl,
164
      int flowControlWindow,
165
      int maxHeaderListSize,
166
      int softLimitHeaderListSize,
167
      int maxMessageSize,
168
      long keepAliveTimeInNanos,
169
      long keepAliveTimeoutInNanos,
170
      long maxConnectionIdleInNanos,
171
      long maxConnectionAgeInNanos,
172
      long maxConnectionAgeGraceInNanos,
173
      boolean permitKeepAliveWithoutCalls,
174
      long permitKeepAliveTimeInNanos,
175
      int maxRstCount,
176
      long maxRstPeriodNanos,
177
      Attributes eagAttributes) {
178
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
179
        maxHeaderListSize);
180
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
181
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
182
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
183
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
184
    Http2HeadersEncoder encoder = new DefaultHttp2HeadersEncoder(
1✔
185
        Http2HeadersEncoder.NEVER_SENSITIVE, false, 16, Integer.MAX_VALUE);
186
    Http2FrameWriter frameWriter =
1✔
187
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(encoder), frameLogger);
188
    return newHandler(
1✔
189
        channelUnused,
190
        frameReader,
191
        frameWriter,
192
        transportListener,
193
        streamTracerFactories,
194
        transportTracer,
195
        maxStreams,
196
        autoFlowControl,
197
        flowControlWindow,
198
        maxHeaderListSize,
199
        softLimitHeaderListSize,
200
        maxMessageSize,
201
        keepAliveTimeInNanos,
202
        keepAliveTimeoutInNanos,
203
        maxConnectionIdleInNanos,
204
        maxConnectionAgeInNanos,
205
        maxConnectionAgeGraceInNanos,
206
        permitKeepAliveWithoutCalls,
207
        permitKeepAliveTimeInNanos,
208
        maxRstCount,
209
        maxRstPeriodNanos,
210
        eagAttributes,
211
        Ticker.systemTicker());
1✔
212
  }
213

214
  static NettyServerHandler newHandler(
215
      ChannelPromise channelUnused,
216
      Http2FrameReader frameReader,
217
      Http2FrameWriter frameWriter,
218
      ServerTransportListener transportListener,
219
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
220
      TransportTracer transportTracer,
221
      int maxStreams,
222
      boolean autoFlowControl,
223
      int flowControlWindow,
224
      int maxHeaderListSize,
225
      int softLimitHeaderListSize,
226
      int maxMessageSize,
227
      long keepAliveTimeInNanos,
228
      long keepAliveTimeoutInNanos,
229
      long maxConnectionIdleInNanos,
230
      long maxConnectionAgeInNanos,
231
      long maxConnectionAgeGraceInNanos,
232
      boolean permitKeepAliveWithoutCalls,
233
      long permitKeepAliveTimeInNanos,
234
      int maxRstCount,
235
      long maxRstPeriodNanos,
236
      Attributes eagAttributes,
237
      Ticker ticker) {
238
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
239
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
240
        flowControlWindow);
241
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
242
        maxHeaderListSize);
243
    Preconditions.checkArgument(
1✔
244
        softLimitHeaderListSize > 0, "softLimitHeaderListSize must be positive: %s",
245
        softLimitHeaderListSize);
246
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
247
        maxMessageSize);
248

249
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
250
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
251
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
252
    DefaultHttp2RemoteFlowController controller =
1✔
253
        new DefaultHttp2RemoteFlowController(connection, dist);
254
    connection.remote().flowController(controller);
1✔
255
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
256
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
257

258
    if (ticker == null) {
1✔
259
      ticker = Ticker.systemTicker();
×
260
    }
261

262
    RstStreamCounter rstStreamCounter
1✔
263
        = new RstStreamCounter(maxRstCount, maxRstPeriodNanos, ticker);
264
    // Create the local flow controller configured to auto-refill the connection window.
265
    connection.local().flowController(
1✔
266
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
267
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
268
    Http2ConnectionEncoder encoder =
1✔
269
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
270
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
271
    encoder = new Http2RstCounterEncoder(encoder, rstStreamCounter);
1✔
272
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
273
        frameReader);
274

275
    Http2Settings settings = new Http2Settings();
1✔
276
    settings.initialWindowSize(flowControlWindow);
1✔
277
    settings.maxConcurrentStreams(maxStreams);
1✔
278
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
279

280
    return new NettyServerHandler(
1✔
281
        channelUnused,
282
        connection,
283
        transportListener,
284
        streamTracerFactories,
285
        transportTracer,
286
        decoder, encoder, settings,
287
        maxMessageSize,
288
        maxHeaderListSize,
289
        softLimitHeaderListSize,
290
        keepAliveTimeInNanos,
291
        keepAliveTimeoutInNanos,
292
        maxConnectionIdleInNanos,
293
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
294
        keepAliveEnforcer,
295
        autoFlowControl,
296
        rstStreamCounter,
297
        eagAttributes, ticker);
298
  }
299

300
  private NettyServerHandler(
301
      ChannelPromise channelUnused,
302
      final Http2Connection connection,
303
      ServerTransportListener transportListener,
304
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
305
      TransportTracer transportTracer,
306
      Http2ConnectionDecoder decoder,
307
      Http2ConnectionEncoder encoder,
308
      Http2Settings settings,
309
      int maxMessageSize,
310
      int maxHeaderListSize,
311
      int softLimitHeaderListSize,
312
      long keepAliveTimeInNanos,
313
      long keepAliveTimeoutInNanos,
314
      long maxConnectionIdleInNanos,
315
      long maxConnectionAgeInNanos,
316
      long maxConnectionAgeGraceInNanos,
317
      final KeepAliveEnforcer keepAliveEnforcer,
318
      boolean autoFlowControl,
319
      RstStreamCounter rstStreamCounter,
320
      Attributes eagAttributes,
321
      Ticker ticker) {
322
    super(
1✔
323
        channelUnused,
324
        decoder,
325
        encoder,
326
        settings,
327
        new ServerChannelLogger(),
328
        autoFlowControl,
329
        null,
330
        ticker,
331
        maxHeaderListSize,
332
        softLimitHeaderListSize);
333

334
    final MaxConnectionIdleManager maxConnectionIdleManager;
335
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
336
      maxConnectionIdleManager = null;
1✔
337
    } else {
338
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
339
    }
340

341
    connection.addListener(new Http2ConnectionAdapter() {
1✔
342
      @Override
343
      public void onStreamActive(Http2Stream stream) {
344
        if (connection.numActiveStreams() == 1) {
1✔
345
          keepAliveEnforcer.onTransportActive();
1✔
346
          if (maxConnectionIdleManager != null) {
1✔
347
            maxConnectionIdleManager.onTransportActive();
1✔
348
          }
349
        }
350
      }
1✔
351

352
      @Override
353
      public void onStreamClosed(Http2Stream stream) {
354
        if (connection.numActiveStreams() == 0) {
1✔
355
          keepAliveEnforcer.onTransportIdle();
1✔
356
          if (maxConnectionIdleManager != null) {
1✔
357
            maxConnectionIdleManager.onTransportIdle();
1✔
358
          }
359
        }
360
      }
1✔
361
    });
362

363
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
364
    this.maxMessageSize = maxMessageSize;
1✔
365
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
366
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
367
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
368
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
369
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
370
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
371
    this.rstStreamCounter = rstStreamCounter;
1✔
372
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
373

374
    streamKey = encoder.connection().newKey();
1✔
375
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
376
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
377
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
378
    // Set the frame listener on the decoder.
379
    decoder().frameListener(new FrameListener());
1✔
380
  }
1✔
381

382
  @Nullable
383
  Throwable connectionError() {
384
    return connectionError;
1✔
385
  }
386

387
  @Override
388
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
389
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
390

391
    // init max connection age monitor
392
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
393
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
394
          new LogExceptionRunnable(new Runnable() {
1✔
395
            @Override
396
            public void run() {
397
              if (gracefulShutdown == null) {
1✔
398
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
399
                gracefulShutdown.start(ctx);
1✔
400
                ctx.flush();
1✔
401
              }
402
            }
1✔
403
          }),
404
          maxConnectionAgeInNanos,
405
          TimeUnit.NANOSECONDS);
406
    }
407

408
    if (maxConnectionIdleManager != null) {
1✔
409
      maxConnectionIdleManager.start(new Runnable() {
1✔
410
        @Override
411
        public void run() {
412
          if (gracefulShutdown == null) {
1✔
413
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
414
            gracefulShutdown.start(ctx);
1✔
415
            ctx.flush();
1✔
416
          }
417
        }
1✔
418
      }, ctx.executor());
1✔
419
    }
420

421
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
422
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
423
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
424
      keepAliveManager.onTransportStarted();
1✔
425
    }
426

427
    assert encoder().connection().equals(decoder().connection());
1✔
428
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
429

430
    super.handlerAdded(ctx);
1✔
431
  }
1✔
432

433
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
434
      throws Http2Exception {
435
    try {
436
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
437
      // by Netty. RFC 7540 section 8.1.2.2
438
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
439
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
440
        return;
×
441
      }
442

443
      if (headers.authority() == null) {
1✔
444
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
445
        if (hosts.size() > 1) {
1✔
446
          // RFC 7230 section 5.4
447
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
448
              "Multiple host headers");
449
          return;
1✔
450
        }
451
        if (!hosts.isEmpty()) {
1✔
452
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
453
        }
454
      }
455
      headers.remove(HOST);
1✔
456

457
      // Remove the leading slash of the path and get the fully qualified method name
458
      CharSequence path = headers.path();
1✔
459

460
      if (path == null) {
1✔
461
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
462
            "Expected path but is missing");
463
        return;
1✔
464
      }
465

466
      if (path.charAt(0) != '/') {
1✔
467
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
468
            String.format("Expected path to start with /: %s", path));
1✔
469
        return;
1✔
470
      }
471

472
      String method = path.subSequence(1, path.length()).toString();
1✔
473

474
      // Verify that the Content-Type is correct in the request.
475
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
476
      if (contentType == null) {
1✔
477
        respondWithHttpError(
1✔
478
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
479
        return;
1✔
480
      }
481
      String contentTypeString = contentType.toString();
1✔
482
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
483
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
484
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
485
        return;
1✔
486
      }
487

488
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
489
        Http2Headers extraHeaders = new DefaultHttp2Headers();
1✔
490
        extraHeaders.add(HttpHeaderNames.ALLOW, HTTP_METHOD);
1✔
491
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
492
            String.format("Method '%s' is not supported", headers.method()), extraHeaders);
1✔
493
        return;
1✔
494
      }
495

496
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
497
      if (Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
498
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize)) {
499
        respondWithHttpError(ctx, streamId, 431, Status.Code.RESOURCE_EXHAUSTED, String.format(
×
500
                "Client Headers of size %d exceeded Metadata size soft limit: %d",
501
                h2HeadersSize,
×
502
                softLimitHeaderListSize));
×
503
        return;
×
504
      }
505

506
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
507
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
508
                + "some intermediate proxy may not support trailers",
509
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
510
        teWarningLogged = true;
1✔
511
      }
512

513
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
514
      // method.
515
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
516

517
      Metadata metadata = Utils.convertHeaders(headers);
1✔
518
      StatsTraceContext statsTraceCtx =
1✔
519
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
520

521
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
522
          this,
523
          ctx.channel().eventLoop(),
1✔
524
          http2Stream,
525
          maxMessageSize,
526
          statsTraceCtx,
527
          transportTracer,
528
          method);
529

530
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
531
        PerfMark.attachTag(state.tag());
1✔
532
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
533
        NettyServerStream stream = new NettyServerStream(
1✔
534
            ctx.channel(),
1✔
535
            state,
536
            attributes,
537
            authority,
538
            statsTraceCtx);
539
        transportListener.streamCreated(stream, method, metadata);
1✔
540
        state.onStreamAllocated();
1✔
541
        http2Stream.setProperty(streamKey, state);
1✔
542
      }
543
    } catch (Exception e) {
×
544
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
545
      // Throw an exception that will get handled by onStreamError.
546
      throw newStreamException(streamId, e);
×
547
    }
1✔
548
  }
1✔
549

550
  private String getOrUpdateAuthority(AsciiString authority) {
551
    if (authority == null) {
1✔
552
      return null;
1✔
553
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
554
      lastKnownAuthority = authority;
1✔
555
    }
556

557
    // AsciiString.toString() is internally cached, so subsequent calls will not
558
    // result in recomputing the String representation of lastKnownAuthority.
559
    return lastKnownAuthority.toString();
1✔
560
  }
561

562
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
563
      throws Http2Exception {
564
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
565
    try {
566
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
567
      if (stream == null) {
1✔
568
        return;
1✔
569
      }
570
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
571
        PerfMark.attachTag(stream.tag());
1✔
572
        stream.inboundDataReceived(data, endOfStream);
1✔
573
      }
574
    } catch (Throwable e) {
×
575
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
576
      // Throw an exception that will get handled by onStreamError.
577
      throw newStreamException(streamId, e);
×
578
    }
1✔
579
  }
1✔
580

581
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
582
    Http2Exception tooManyRstStream = rstStreamCounter.countRstStream();
1✔
583
    if (tooManyRstStream != null) {
1✔
584
      throw tooManyRstStream;
1✔
585
    }
586

587
    try {
588
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
589
      if (stream != null) {
1✔
590
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
591
          PerfMark.attachTag(stream.tag());
1✔
592
          stream.transportReportStatus(
1✔
593
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
594
        }
595
      }
596
    } catch (Throwable e) {
×
597
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
×
598
      // Throw an exception that will get handled by onStreamError.
599
      throw newStreamException(streamId, e);
×
600
    }
1✔
601
  }
1✔
602

603
  @Override
604
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
605
      Http2Exception http2Ex) {
606
    logger.log(Level.FINE, "Connection Error", cause);
1✔
607
    connectionError = cause;
1✔
608
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
609
  }
1✔
610

611
  @Override
612
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
613
      StreamException http2Ex) {
614
    NettyServerStream.TransportState serverStream = serverStream(
1✔
615
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
616
    Level level = Level.WARNING;
1✔
617
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
618
      level = Level.FINE;
×
619
    }
620
    logger.log(level, "Stream Error", cause);
1✔
621
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
622
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
623
      PerfMark.attachTag(tag);
1✔
624
      if (serverStream != null) {
1✔
625
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
626
      }
627
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
628
      // Delegate to the base class to send a RST_STREAM.
629
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
630
    }
631
  }
1✔
632

633
  @Override
634
  public void handleProtocolNegotiationCompleted(
635
      Attributes attrs, InternalChannelz.Security securityInfo) {
636
    negotiationAttributes = attrs;
1✔
637
    this.securityInfo = securityInfo;
1✔
638
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
639
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
640
  }
1✔
641

642
  @Override
643
  public Attributes getEagAttributes() {
644
    return eagAttributes;
1✔
645
  }
646

647
  InternalChannelz.Security getSecurityInfo() {
648
    return securityInfo;
1✔
649
  }
650

651
  @VisibleForTesting
652
  KeepAliveManager getKeepAliveManagerForTest() {
653
    return keepAliveManager;
1✔
654
  }
655

656
  @VisibleForTesting
657
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
658
    this.keepAliveManager = keepAliveManager;
1✔
659
  }
1✔
660

661
  /**
662
   * Handler for the Channel shutting down.
663
   */
664
  @Override
665
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
666
    try {
667
      if (keepAliveManager != null) {
1✔
668
        keepAliveManager.onTransportTermination();
1✔
669
      }
670
      if (maxConnectionIdleManager != null) {
1✔
671
        maxConnectionIdleManager.onTransportTermination();
1✔
672
      }
673
      if (maxConnectionAgeMonitor != null) {
1✔
674
        maxConnectionAgeMonitor.cancel(false);
1✔
675
      }
676
      final Status status =
1✔
677
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
678
      // Any streams that are still active must be closed
679
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
680
        @Override
681
        public boolean visit(Http2Stream stream) throws Http2Exception {
682
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
683
          if (serverStream != null) {
1✔
684
            serverStream.transportReportStatus(status);
1✔
685
          }
686
          return true;
1✔
687
        }
688
      });
689
    } finally {
690
      super.channelInactive(ctx);
1✔
691
    }
692
  }
1✔
693

694
  WriteQueue getWriteQueue() {
695
    return serverWriteQueue;
1✔
696
  }
697

698
  /** Handler for commands sent from the stream. */
699
  @Override
700
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
701
      throws Exception {
702
    if (msg instanceof SendGrpcFrameCommand) {
1✔
703
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
704
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
705
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
706
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
707
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
708
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
709
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
710
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
711
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
712
    } else {
713
      AssertionError e =
×
714
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
715
      ReferenceCountUtil.release(msg);
×
716
      promise.setFailure(e);
×
717
      throw e;
×
718
    }
719
  }
1✔
720

721
  @Override
722
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
723
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
724
    ctx.flush();
1✔
725
  }
1✔
726

727
  /**
728
   * Returns the given processed bytes back to inbound flow control.
729
   */
730
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
731
    try {
732
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
733
    } catch (Http2Exception e) {
×
734
      throw new RuntimeException(e);
×
735
    }
1✔
736
  }
1✔
737

738
  private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
739
    promise.addListener(
1✔
740
        new ChannelFutureListener() {
1✔
741
          @Override
742
          public void operationComplete(ChannelFuture future) {
743
            serverStream(stream).complete();
1✔
744
          }
1✔
745
        });
746
  }
1✔
747

748
  private static void streamGone(int streamId, ChannelPromise promise) {
749
    promise.setFailure(
1✔
750
        new IllegalStateException(
751
            "attempting to write to stream " + streamId + " that no longer exists") {
1✔
752
          @Override
753
          public synchronized Throwable fillInStackTrace() {
754
            return this;
1✔
755
          }
756
        });
757
  }
1✔
758

759
  /** Sends the given gRPC frame to the client. */
760
  private void sendGrpcFrame(
761
      ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
762
      throws Http2Exception {
763
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
764
      PerfMark.attachTag(cmd.stream().tag());
1✔
765
      PerfMark.linkIn(cmd.getLink());
1✔
766
      int streamId = cmd.stream().id();
1✔
767
      Http2Stream stream = connection().stream(streamId);
1✔
768
      if (stream == null) {
1✔
769
        cmd.release();
1✔
770
        streamGone(streamId, promise);
1✔
771
        return;
1✔
772
      }
773
      if (cmd.endStream()) {
1✔
774
        closeStreamWhenDone(promise, stream);
×
775
      }
776
      // Call the base class to write the HTTP/2 DATA frame.
777
      encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
1✔
778
    }
779
  }
1✔
780

781
  /**
782
   * Sends the response headers to the client.
783
   */
784
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
785
      ChannelPromise promise) throws Http2Exception {
786
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
787
      PerfMark.attachTag(cmd.stream().tag());
1✔
788
      PerfMark.linkIn(cmd.getLink());
1✔
789
      int streamId = cmd.stream().id();
1✔
790
      Http2Stream stream = connection().stream(streamId);
1✔
791
      if (stream == null) {
1✔
792
        streamGone(streamId, promise);
1✔
793
        return;
1✔
794
      }
795
      if (cmd.endOfStream()) {
1✔
796
        closeStreamWhenDone(promise, stream);
1✔
797
      }
798
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
799
    }
800
  }
1✔
801

802
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
803
      ChannelPromise promise) {
804
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
805
      PerfMark.attachTag(cmd.stream().tag());
1✔
806
      PerfMark.linkIn(cmd.getLink());
1✔
807
      // Notify the listener if we haven't already.
808
      cmd.stream().transportReportStatus(cmd.reason());
1✔
809

810
      // Now we need to decide how we're going to notify the peer that this stream is closed.
811
      // If possible, it's nice to inform the peer _why_ this stream was cancelled by sending
812
      // a structured headers frame.
813
      if (shouldCloseStreamWithHeaders(cmd, connection())) {
1✔
814
        Metadata md = new Metadata();
1✔
815
        md.put(InternalStatus.CODE_KEY, cmd.reason());
1✔
816
        if (cmd.reason().getDescription() != null) {
1✔
817
          md.put(InternalStatus.MESSAGE_KEY, cmd.reason().getDescription());
1✔
818
        }
819
        Http2Headers headers = Utils.convertServerHeaders(md);
1✔
820
        encoder().writeHeaders(
1✔
821
            ctx, cmd.stream().id(), headers, /* padding = */ 0, /* endStream = */ true, promise);
1✔
822
      } else {
1✔
823
        // Terminate the stream.
824
        encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
825
      }
826
    }
827
  }
1✔
828

829
  // Determine whether a CancelServerStreamCommand should try to close the stream with a
830
  // HEADERS or a RST_STREAM frame. The caller has some influence over this (they can
831
  // configure cmd.wantsHeaders()). The state of the stream also has an influence: we
832
  // only try to send HEADERS if the stream exists and hasn't already sent any headers.
833
  private static boolean shouldCloseStreamWithHeaders(
834
          CancelServerStreamCommand cmd, Http2Connection conn) {
835
    if (!cmd.wantsHeaders()) {
1✔
836
      return false;
1✔
837
    }
838
    Http2Stream stream = conn.stream(cmd.stream().id());
1✔
839
    return stream != null && !stream.isHeadersSent();
1✔
840
  }
841

842
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
843
      ChannelPromise promise) throws Exception {
844
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
845
    // requested here. But that's an edge case and seems bug-prone.
846
    if (gracefulShutdown == null) {
1✔
847
      Long graceTimeInNanos = null;
1✔
848
      if (msg.getGraceTimeUnit() != null) {
1✔
849
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
850
      }
851
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
852
      gracefulShutdown.start(ctx);
1✔
853
    }
854
    promise.setSuccess();
1✔
855
  }
1✔
856

857
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
858
      ChannelPromise promise) throws Exception {
859
    super.close(ctx, promise);
1✔
860
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
861
      @Override
862
      public boolean visit(Http2Stream stream) throws Http2Exception {
863
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
864
        if (serverStream != null) {
1✔
865
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
866
            PerfMark.attachTag(serverStream.tag());
1✔
867
            PerfMark.linkIn(msg.getLink());
1✔
868
            serverStream.transportReportStatus(msg.getStatus());
1✔
869
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
870
          }
871
        }
872
        stream.close();
1✔
873
        return true;
1✔
874
      }
875
    });
876
  }
1✔
877

878
  private void respondWithHttpError(
879
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
880
    respondWithHttpError(ctx, streamId, code, statusCode, msg, EmptyHttp2Headers.INSTANCE);
1✔
881
  }
1✔
882

883
  private void respondWithHttpError(
884
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg,
885
      Http2Headers extraHeaders) {
886
    Metadata metadata = new Metadata();
1✔
887
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
888
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
889
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
890

891
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
892
        .status("" + code)
1✔
893
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
894
    for (int i = 0; i < serialized.length; i += 2) {
1✔
895
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
896
    }
897
    headers.add(extraHeaders);
1✔
898
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
899
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
900
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
901
  }
1✔
902

903
  private Http2Stream requireHttp2Stream(int streamId) {
904
    Http2Stream stream = connection().stream(streamId);
1✔
905
    if (stream == null) {
1✔
906
      // This should never happen.
907
      throw new AssertionError("Stream does not exist: " + streamId);
×
908
    }
909
    return stream;
1✔
910
  }
911

912
  /**
913
   * Returns the server stream associated to the given HTTP/2 stream object.
914
   */
915
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
916
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
917
  }
918

919
  private Http2Exception newStreamException(int streamId, Throwable cause) {
920
    return Http2Exception.streamError(
×
921
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
×
922
  }
923

924
  private class FrameListener extends Http2FrameAdapter {
1✔
925
    private boolean firstSettings = true;
1✔
926

927
    @Override
928
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
929
      if (firstSettings) {
1✔
930
        firstSettings = false;
1✔
931
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
932
        // handshakeTimeout
933
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
934
      }
935
    }
1✔
936

937
    @Override
938
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
939
        boolean endOfStream) throws Http2Exception {
940
      if (keepAliveManager != null) {
1✔
941
        keepAliveManager.onDataReceived();
1✔
942
      }
943
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
944
      return padding;
1✔
945
    }
946

947
    @Override
948
    public void onHeadersRead(ChannelHandlerContext ctx,
949
        int streamId,
950
        Http2Headers headers,
951
        int streamDependency,
952
        short weight,
953
        boolean exclusive,
954
        int padding,
955
        boolean endStream) throws Http2Exception {
956
      if (keepAliveManager != null) {
1✔
957
        keepAliveManager.onDataReceived();
1✔
958
      }
959
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
960
      if (endStream) {
1✔
961
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
962
      }
963
    }
1✔
964

965
    @Override
966
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
967
        throws Http2Exception {
968
      if (keepAliveManager != null) {
1✔
969
        keepAliveManager.onDataReceived();
1✔
970
      }
971
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
972
    }
1✔
973

974
    @Override
975
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
976
      if (keepAliveManager != null) {
1✔
977
        keepAliveManager.onDataReceived();
1✔
978
      }
979
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
980
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
981
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
982
            debugData, ctx.newPromise());
1✔
983
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
984
        try {
985
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
986
        } catch (Exception ex) {
×
987
          onError(ctx, /* outbound= */ true, ex);
×
988
        }
1✔
989
      }
990
    }
1✔
991

992
    @Override
993
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
994
      if (keepAliveManager != null) {
1✔
995
        keepAliveManager.onDataReceived();
1✔
996
      }
997
      if (data == flowControlPing().payload()) {
1✔
998
        flowControlPing().updateWindow();
1✔
999
        logger.log(Level.FINE, "Window: {0}",
1✔
1000
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
1001
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
1002
        if (gracefulShutdown == null) {
1✔
1003
          // this should never happen
1004
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
1005
        } else {
1006
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
1007
        }
1008
      } else if (data != KEEPALIVE_PING) {
1✔
1009
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1010
      }
1011
    }
1✔
1012
  }
1013

1014
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1015
    final ChannelHandlerContext ctx;
1016

1017
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
1018
      this.ctx = ctx;
1✔
1019
    }
1✔
1020

1021
    @Override
1022
    public void ping() {
1023
      ChannelFuture pingFuture = encoder().writePing(
1✔
1024
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
1025
      ctx.flush();
1✔
1026
      pingFuture.addListener(new ChannelFutureListener() {
1✔
1027
        @Override
1028
        public void operationComplete(ChannelFuture future) throws Exception {
1029
          if (future.isSuccess()) {
1✔
1030
            transportTracer.reportKeepAliveSent();
1✔
1031
          }
1032
        }
1✔
1033
      });
1034
    }
1✔
1035

1036
    @Override
1037
    public void onPingTimeout() {
1038
      try {
1039
        forcefulClose(
1✔
1040
            ctx,
1041
            new ForcefulCloseCommand(Status.UNAVAILABLE
1042
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
1043
            ctx.newPromise());
1✔
1044
      } catch (Exception ex) {
×
1045
        try {
1046
          exceptionCaught(ctx, ex);
×
1047
        } catch (Exception ex2) {
×
1048
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
1049
          logger.log(Level.WARNING, "Original failure", ex);
×
1050
        }
×
1051
      }
1✔
1052
    }
1✔
1053
  }
1054

1055
  private final class GracefulShutdown {
1056
    String goAwayMessage;
1057

1058
    /**
1059
     * The grace time between starting graceful shutdown and closing the netty channel,
1060
     * {@code null} is unspecified.
1061
     */
1062
    @CheckForNull
1063
    Long graceTimeInNanos;
1064

1065
    /**
1066
     * True if ping is Acked or ping is timeout.
1067
     */
1068
    boolean pingAckedOrTimeout;
1069

1070
    Future<?> pingFuture;
1071

1072
    GracefulShutdown(String goAwayMessage,
1073
        @Nullable Long graceTimeInNanos) {
1✔
1074
      this.goAwayMessage = goAwayMessage;
1✔
1075
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1076
    }
1✔
1077

1078
    /**
1079
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1080
     */
1081
    void start(final ChannelHandlerContext ctx) {
1082
      goAway(
1✔
1083
          ctx,
1084
          Integer.MAX_VALUE,
1085
          Http2Error.NO_ERROR.code(),
1✔
1086
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1087
          ctx.newPromise());
1✔
1088

1089
      pingFuture = ctx.executor().schedule(
1✔
1090
          new Runnable() {
1✔
1091
            @Override
1092
            public void run() {
1093
              secondGoAwayAndClose(ctx);
1✔
1094
            }
1✔
1095
          },
1096
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1097
          TimeUnit.NANOSECONDS);
1098

1099
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1100
    }
1✔
1101

1102
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1103
      if (pingAckedOrTimeout) {
1✔
1104
        return;
×
1105
      }
1106
      pingAckedOrTimeout = true;
1✔
1107

1108
      checkNotNull(pingFuture, "pingFuture");
1✔
1109
      pingFuture.cancel(false);
1✔
1110

1111
      // send the second GOAWAY with last stream id
1112
      goAway(
1✔
1113
          ctx,
1114
          connection().remote().lastStreamCreated(),
1✔
1115
          Http2Error.NO_ERROR.code(),
1✔
1116
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1117
          ctx.newPromise());
1✔
1118

1119
      // gracefully shutdown with specified grace time
1120
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1121
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1122
      try {
1123
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1124
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1125
      } catch (Exception e) {
×
1126
        onError(ctx, /* outbound= */ true, e);
×
1127
      } finally {
1128
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1129
      }
1130
    }
1✔
1131

1132
    private long graceTimeOverrideMillis(long originalMillis) {
1133
      if (graceTimeInNanos == null) {
1✔
1134
        return originalMillis;
1✔
1135
      }
1136
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1137
        // netty treats -1 as "no timeout"
1138
        return -1L;
1✔
1139
      }
1140
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1141
    }
1142
  }
1143

1144
  // Use a frame writer so that we know when frames are through flow control and actually being
1145
  // written.
1146
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1147
    private final KeepAliveEnforcer keepAliveEnforcer;
1148

1149
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1150
        KeepAliveEnforcer keepAliveEnforcer) {
1151
      super(delegate);
1✔
1152
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1153
    }
1✔
1154

1155
    @Override
1156
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1157
        int padding, boolean endStream, ChannelPromise promise) {
1158
      keepAliveEnforcer.resetCounters();
1✔
1159
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1160
    }
1161

1162
    @Override
1163
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1164
        int padding, boolean endStream, ChannelPromise promise) {
1165
      keepAliveEnforcer.resetCounters();
1✔
1166
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1167
    }
1168

1169
    @Override
1170
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1171
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1172
        ChannelPromise promise) {
1173
      keepAliveEnforcer.resetCounters();
×
1174
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1175
          padding, endStream, promise);
1176
    }
1177
  }
1178

1179
  private static final class Http2RstCounterEncoder extends DecoratingHttp2ConnectionEncoder {
1180
    private final RstStreamCounter rstStreamCounter;
1181
    private Http2LifecycleManager lifecycleManager;
1182

1183
    Http2RstCounterEncoder(Http2ConnectionEncoder encoder, RstStreamCounter rstStreamCounter) {
1184
      super(encoder);
1✔
1185
      this.rstStreamCounter = rstStreamCounter;
1✔
1186
    }
1✔
1187

1188
    @Override
1189
    public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
1190
      this.lifecycleManager = lifecycleManager;
1✔
1191
      super.lifecycleManager(lifecycleManager);
1✔
1192
    }
1✔
1193

1194
    @Override
1195
    public ChannelFuture writeRstStream(
1196
        ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) {
1197
      ChannelFuture future = super.writeRstStream(ctx, streamId, errorCode, promise);
1✔
1198
      // We want to count "induced" RST_STREAM, where the server sent a reset because of a malformed
1199
      // frame.
1200
      boolean normalRst
1✔
1201
          = errorCode == Http2Error.NO_ERROR.code() || errorCode == Http2Error.CANCEL.code();
1✔
1202
      if (!normalRst) {
1✔
1203
        Http2Exception tooManyRstStream = rstStreamCounter.countRstStream();
1✔
1204
        if (tooManyRstStream != null) {
1✔
1205
          lifecycleManager.onError(ctx, true, tooManyRstStream);
1✔
1206
          ctx.close();
1✔
1207
        }
1208
      }
1209
      return future;
1✔
1210
    }
1211
  }
1212

1213
  private static final class RstStreamCounter {
1214
    private final int maxRstCount;
1215
    private final long maxRstPeriodNanos;
1216
    private final Ticker ticker;
1217
    private int rstCount;
1218
    private long lastRstNanoTime;
1219

1220
    RstStreamCounter(int maxRstCount, long maxRstPeriodNanos, Ticker ticker) {
1✔
1221
      checkArgument(maxRstCount >= 0, "maxRstCount must be non-negative: %s", maxRstCount);
1✔
1222
      this.maxRstCount = maxRstCount;
1✔
1223
      this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
1224
      this.ticker = checkNotNull(ticker, "ticker");
1✔
1225
      this.lastRstNanoTime = ticker.read();
1✔
1226
    }
1✔
1227

1228
    /** Returns non-{@code null} when the connection should be killed by the caller. */
1229
    private Http2Exception countRstStream() {
1230
      if (maxRstCount == 0) {
1✔
1231
        return null;
1✔
1232
      }
1233
      long now = ticker.read();
1✔
1234
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
1235
        lastRstNanoTime = now;
1✔
1236
        rstCount = 1;
1✔
1237
      } else {
1238
        rstCount++;
1✔
1239
        if (rstCount > maxRstCount) {
1✔
1240
          return new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
1241
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
1242
            @Override
1243
            public Throwable fillInStackTrace() {
1244
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
1245
              return this;
1✔
1246
            }
1247
          };
1248
        }
1249
      }
1250
      return null;
1✔
1251
    }
1252
  }
1253

1254
  private static class ServerChannelLogger extends ChannelLogger {
1255
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1256

1257
    @Override
1258
    public void log(ChannelLogLevel level, String message) {
1259
      log.log(toJavaLogLevel(level), message);
1✔
1260
    }
1✔
1261

1262
    @Override
1263
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1264
      log(level, MessageFormat.format(messageFormat, args));
1✔
1265
    }
1✔
1266
  }
1267

1268
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1269
    switch (level) {
1✔
1270
      case ERROR:
1271
        return Level.FINE;
×
1272
      case WARNING:
1273
        return Level.FINER;
×
1274
      default:
1275
        return Level.FINEST;
1✔
1276
    }
1277
  }
1278
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc