• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19976

10 Sep 2025 12:09PM UTC coverage: 88.581% (+0.03%) from 88.547%
#19976

push

github

web-flow
netty, okhttp: Add allow header for response code 405 (#12334)

Fixes #12329

34829 of 39319 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.11
/../netty/src/main/java/io/grpc/netty/NettyServerHandler.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25
import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26
import static io.grpc.netty.Utils.HTTP_METHOD;
27
import static io.grpc.netty.Utils.TE_HEADER;
28
import static io.grpc.netty.Utils.TE_TRAILERS;
29
import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31
import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32
import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33

34
import com.google.common.annotations.VisibleForTesting;
35
import com.google.common.base.Preconditions;
36
import com.google.common.base.Strings;
37
import com.google.common.base.Ticker;
38
import io.grpc.Attributes;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.InternalChannelz;
42
import io.grpc.InternalMetadata;
43
import io.grpc.InternalStatus;
44
import io.grpc.Metadata;
45
import io.grpc.ServerStreamTracer;
46
import io.grpc.Status;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.KeepAliveEnforcer;
49
import io.grpc.internal.KeepAliveManager;
50
import io.grpc.internal.LogExceptionRunnable;
51
import io.grpc.internal.MaxConnectionIdleManager;
52
import io.grpc.internal.ServerTransportListener;
53
import io.grpc.internal.StatsTraceContext;
54
import io.grpc.internal.TransportTracer;
55
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56
import io.netty.buffer.ByteBuf;
57
import io.netty.buffer.ByteBufUtil;
58
import io.netty.buffer.Unpooled;
59
import io.netty.channel.ChannelFuture;
60
import io.netty.channel.ChannelFutureListener;
61
import io.netty.channel.ChannelHandlerContext;
62
import io.netty.channel.ChannelPromise;
63
import io.netty.handler.codec.http.HttpHeaderNames;
64
import io.netty.handler.codec.http2.DecoratingHttp2ConnectionEncoder;
65
import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
66
import io.netty.handler.codec.http2.DefaultHttp2Connection;
67
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
68
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
69
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
70
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
71
import io.netty.handler.codec.http2.DefaultHttp2Headers;
72
import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
73
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
74
import io.netty.handler.codec.http2.EmptyHttp2Headers;
75
import io.netty.handler.codec.http2.Http2Connection;
76
import io.netty.handler.codec.http2.Http2ConnectionAdapter;
77
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
78
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
79
import io.netty.handler.codec.http2.Http2Error;
80
import io.netty.handler.codec.http2.Http2Exception;
81
import io.netty.handler.codec.http2.Http2Exception.StreamException;
82
import io.netty.handler.codec.http2.Http2FrameAdapter;
83
import io.netty.handler.codec.http2.Http2FrameLogger;
84
import io.netty.handler.codec.http2.Http2FrameReader;
85
import io.netty.handler.codec.http2.Http2FrameWriter;
86
import io.netty.handler.codec.http2.Http2Headers;
87
import io.netty.handler.codec.http2.Http2HeadersDecoder;
88
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
89
import io.netty.handler.codec.http2.Http2LifecycleManager;
90
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
91
import io.netty.handler.codec.http2.Http2Settings;
92
import io.netty.handler.codec.http2.Http2Stream;
93
import io.netty.handler.codec.http2.Http2StreamVisitor;
94
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
95
import io.netty.handler.logging.LogLevel;
96
import io.netty.util.AsciiString;
97
import io.netty.util.ReferenceCountUtil;
98
import io.perfmark.PerfMark;
99
import io.perfmark.Tag;
100
import io.perfmark.TaskCloseable;
101
import java.text.MessageFormat;
102
import java.util.List;
103
import java.util.concurrent.Future;
104
import java.util.concurrent.ScheduledFuture;
105
import java.util.concurrent.TimeUnit;
106
import java.util.logging.Level;
107
import java.util.logging.Logger;
108
import javax.annotation.CheckForNull;
109
import javax.annotation.Nullable;
110

111
/**
112
 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
113
 * the context of the Netty Channel thread.
114
 */
115
class NettyServerHandler extends AbstractNettyHandler {
116
  private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
1✔
117
  private static final long KEEPALIVE_PING = 0xDEADL;
118
  @VisibleForTesting
119
  static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
120
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
1✔
121
  /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
122
  private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
1✔
123
      System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
1✔
124

125
  private final Http2Connection.PropertyKey streamKey;
126
  private final ServerTransportListener transportListener;
127
  private final int maxMessageSize;
128
  private final long keepAliveTimeInNanos;
129
  private final long keepAliveTimeoutInNanos;
130
  private final long maxConnectionAgeInNanos;
131
  private final long maxConnectionAgeGraceInNanos;
132
  private final RstStreamCounter rstStreamCounter;
133
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
134
  private final TransportTracer transportTracer;
135
  private final KeepAliveEnforcer keepAliveEnforcer;
136
  private final Attributes eagAttributes;
137
  /** Incomplete attributes produced by negotiator. */
138
  private Attributes negotiationAttributes;
139
  private InternalChannelz.Security securityInfo;
140
  /** Completed attributes produced by transportReady. */
141
  private Attributes attributes;
142
  private Throwable connectionError;
143
  private boolean teWarningLogged;
144
  private WriteQueue serverWriteQueue;
145
  private AsciiString lastKnownAuthority;
146
  @CheckForNull
147
  private KeepAliveManager keepAliveManager;
148
  @CheckForNull
149
  private MaxConnectionIdleManager maxConnectionIdleManager;
150
  @CheckForNull
151
  private ScheduledFuture<?> maxConnectionAgeMonitor;
152
  @CheckForNull
153
  private GracefulShutdown gracefulShutdown;
154

155
  static NettyServerHandler newHandler(
156
      ServerTransportListener transportListener,
157
      ChannelPromise channelUnused,
158
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
159
      TransportTracer transportTracer,
160
      int maxStreams,
161
      boolean autoFlowControl,
162
      int flowControlWindow,
163
      int maxHeaderListSize,
164
      int softLimitHeaderListSize,
165
      int maxMessageSize,
166
      long keepAliveTimeInNanos,
167
      long keepAliveTimeoutInNanos,
168
      long maxConnectionIdleInNanos,
169
      long maxConnectionAgeInNanos,
170
      long maxConnectionAgeGraceInNanos,
171
      boolean permitKeepAliveWithoutCalls,
172
      long permitKeepAliveTimeInNanos,
173
      int maxRstCount,
174
      long maxRstPeriodNanos,
175
      Attributes eagAttributes) {
176
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
177
        maxHeaderListSize);
178
    Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
1✔
179
    Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
1✔
180
    Http2FrameReader frameReader = new Http2InboundFrameLogger(
1✔
181
        new DefaultHttp2FrameReader(headersDecoder), frameLogger);
182
    Http2FrameWriter frameWriter =
1✔
183
        new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
184
    return newHandler(
1✔
185
        channelUnused,
186
        frameReader,
187
        frameWriter,
188
        transportListener,
189
        streamTracerFactories,
190
        transportTracer,
191
        maxStreams,
192
        autoFlowControl,
193
        flowControlWindow,
194
        maxHeaderListSize,
195
        softLimitHeaderListSize,
196
        maxMessageSize,
197
        keepAliveTimeInNanos,
198
        keepAliveTimeoutInNanos,
199
        maxConnectionIdleInNanos,
200
        maxConnectionAgeInNanos,
201
        maxConnectionAgeGraceInNanos,
202
        permitKeepAliveWithoutCalls,
203
        permitKeepAliveTimeInNanos,
204
        maxRstCount,
205
        maxRstPeriodNanos,
206
        eagAttributes,
207
        Ticker.systemTicker());
1✔
208
  }
209

210
  static NettyServerHandler newHandler(
211
      ChannelPromise channelUnused,
212
      Http2FrameReader frameReader,
213
      Http2FrameWriter frameWriter,
214
      ServerTransportListener transportListener,
215
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
216
      TransportTracer transportTracer,
217
      int maxStreams,
218
      boolean autoFlowControl,
219
      int flowControlWindow,
220
      int maxHeaderListSize,
221
      int softLimitHeaderListSize,
222
      int maxMessageSize,
223
      long keepAliveTimeInNanos,
224
      long keepAliveTimeoutInNanos,
225
      long maxConnectionIdleInNanos,
226
      long maxConnectionAgeInNanos,
227
      long maxConnectionAgeGraceInNanos,
228
      boolean permitKeepAliveWithoutCalls,
229
      long permitKeepAliveTimeInNanos,
230
      int maxRstCount,
231
      long maxRstPeriodNanos,
232
      Attributes eagAttributes,
233
      Ticker ticker) {
234
    Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
1✔
235
    Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
1✔
236
        flowControlWindow);
237
    Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
1✔
238
        maxHeaderListSize);
239
    Preconditions.checkArgument(
1✔
240
        softLimitHeaderListSize > 0, "softLimitHeaderListSize must be positive: %s",
241
        softLimitHeaderListSize);
242
    Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
1✔
243
        maxMessageSize);
244

245
    final Http2Connection connection = new DefaultHttp2Connection(true);
1✔
246
    UniformStreamByteDistributor dist = new UniformStreamByteDistributor(connection);
1✔
247
    dist.minAllocationChunk(MIN_ALLOCATED_CHUNK); // Increased for benchmarks performance.
1✔
248
    DefaultHttp2RemoteFlowController controller =
1✔
249
        new DefaultHttp2RemoteFlowController(connection, dist);
250
    connection.remote().flowController(controller);
1✔
251
    final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
1✔
252
        permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
253

254
    if (ticker == null) {
1✔
255
      ticker = Ticker.systemTicker();
×
256
    }
257

258
    RstStreamCounter rstStreamCounter
1✔
259
        = new RstStreamCounter(maxRstCount, maxRstPeriodNanos, ticker);
260
    // Create the local flow controller configured to auto-refill the connection window.
261
    connection.local().flowController(
1✔
262
        new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
263
    frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
1✔
264
    Http2ConnectionEncoder encoder =
1✔
265
        new DefaultHttp2ConnectionEncoder(connection, frameWriter);
266
    encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
1✔
267
    encoder = new Http2RstCounterEncoder(encoder, rstStreamCounter);
1✔
268
    Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
1✔
269
        frameReader);
270

271
    Http2Settings settings = new Http2Settings();
1✔
272
    settings.initialWindowSize(flowControlWindow);
1✔
273
    settings.maxConcurrentStreams(maxStreams);
1✔
274
    settings.maxHeaderListSize(maxHeaderListSize);
1✔
275

276
    return new NettyServerHandler(
1✔
277
        channelUnused,
278
        connection,
279
        transportListener,
280
        streamTracerFactories,
281
        transportTracer,
282
        decoder, encoder, settings,
283
        maxMessageSize,
284
        maxHeaderListSize,
285
        softLimitHeaderListSize,
286
        keepAliveTimeInNanos,
287
        keepAliveTimeoutInNanos,
288
        maxConnectionIdleInNanos,
289
        maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
290
        keepAliveEnforcer,
291
        autoFlowControl,
292
        rstStreamCounter,
293
        eagAttributes, ticker);
294
  }
295

296
  private NettyServerHandler(
297
      ChannelPromise channelUnused,
298
      final Http2Connection connection,
299
      ServerTransportListener transportListener,
300
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
301
      TransportTracer transportTracer,
302
      Http2ConnectionDecoder decoder,
303
      Http2ConnectionEncoder encoder,
304
      Http2Settings settings,
305
      int maxMessageSize,
306
      int maxHeaderListSize,
307
      int softLimitHeaderListSize,
308
      long keepAliveTimeInNanos,
309
      long keepAliveTimeoutInNanos,
310
      long maxConnectionIdleInNanos,
311
      long maxConnectionAgeInNanos,
312
      long maxConnectionAgeGraceInNanos,
313
      final KeepAliveEnforcer keepAliveEnforcer,
314
      boolean autoFlowControl,
315
      RstStreamCounter rstStreamCounter,
316
      Attributes eagAttributes,
317
      Ticker ticker) {
318
    super(
1✔
319
        channelUnused,
320
        decoder,
321
        encoder,
322
        settings,
323
        new ServerChannelLogger(),
324
        autoFlowControl,
325
        null,
326
        ticker,
327
        maxHeaderListSize,
328
        softLimitHeaderListSize);
329

330
    final MaxConnectionIdleManager maxConnectionIdleManager;
331
    if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
332
      maxConnectionIdleManager = null;
1✔
333
    } else {
334
      maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
1✔
335
    }
336

337
    connection.addListener(new Http2ConnectionAdapter() {
1✔
338
      @Override
339
      public void onStreamActive(Http2Stream stream) {
340
        if (connection.numActiveStreams() == 1) {
1✔
341
          keepAliveEnforcer.onTransportActive();
1✔
342
          if (maxConnectionIdleManager != null) {
1✔
343
            maxConnectionIdleManager.onTransportActive();
1✔
344
          }
345
        }
346
      }
1✔
347

348
      @Override
349
      public void onStreamClosed(Http2Stream stream) {
350
        if (connection.numActiveStreams() == 0) {
1✔
351
          keepAliveEnforcer.onTransportIdle();
1✔
352
          if (maxConnectionIdleManager != null) {
1✔
353
            maxConnectionIdleManager.onTransportIdle();
1✔
354
          }
355
        }
356
      }
1✔
357
    });
358

359
    checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
1✔
360
    this.maxMessageSize = maxMessageSize;
1✔
361
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
362
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
363
    this.maxConnectionIdleManager = maxConnectionIdleManager;
1✔
364
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
365
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
366
    this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
1✔
367
    this.rstStreamCounter = rstStreamCounter;
1✔
368
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
369

370
    streamKey = encoder.connection().newKey();
1✔
371
    this.transportListener = checkNotNull(transportListener, "transportListener");
1✔
372
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
373
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
374
    // Set the frame listener on the decoder.
375
    decoder().frameListener(new FrameListener());
1✔
376
  }
1✔
377

378
  @Nullable
379
  Throwable connectionError() {
380
    return connectionError;
1✔
381
  }
382

383
  @Override
384
  public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
385
    serverWriteQueue = new WriteQueue(ctx.channel());
1✔
386

387
    // init max connection age monitor
388
    if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
389
      maxConnectionAgeMonitor = ctx.executor().schedule(
1✔
390
          new LogExceptionRunnable(new Runnable() {
1✔
391
            @Override
392
            public void run() {
393
              if (gracefulShutdown == null) {
1✔
394
                gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
1✔
395
                gracefulShutdown.start(ctx);
1✔
396
                ctx.flush();
1✔
397
              }
398
            }
1✔
399
          }),
400
          maxConnectionAgeInNanos,
401
          TimeUnit.NANOSECONDS);
402
    }
403

404
    if (maxConnectionIdleManager != null) {
1✔
405
      maxConnectionIdleManager.start(new Runnable() {
1✔
406
        @Override
407
        public void run() {
408
          if (gracefulShutdown == null) {
1✔
409
            gracefulShutdown = new GracefulShutdown("max_idle", null);
1✔
410
            gracefulShutdown.start(ctx);
1✔
411
            ctx.flush();
1✔
412
          }
413
        }
1✔
414
      }, ctx.executor());
1✔
415
    }
416

417
    if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
418
      keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
1✔
419
          keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
420
      keepAliveManager.onTransportStarted();
1✔
421
    }
422

423
    assert encoder().connection().equals(decoder().connection());
1✔
424
    transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
1✔
425

426
    super.handlerAdded(ctx);
1✔
427
  }
1✔
428

429
  private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
430
      throws Http2Exception {
431
    try {
432
      // Connection-specific header fields makes a request malformed. Ideally this would be handled
433
      // by Netty. RFC 7540 section 8.1.2.2
434
      if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
1✔
435
        resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
×
436
        return;
×
437
      }
438

439
      if (headers.authority() == null) {
1✔
440
        List<CharSequence> hosts = headers.getAll(HOST);
1✔
441
        if (hosts.size() > 1) {
1✔
442
          // RFC 7230 section 5.4
443
          respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
1✔
444
              "Multiple host headers");
445
          return;
1✔
446
        }
447
        if (!hosts.isEmpty()) {
1✔
448
          headers.add(AUTHORITY.value(), hosts.get(0));
1✔
449
        }
450
      }
451
      headers.remove(HOST);
1✔
452

453
      // Remove the leading slash of the path and get the fully qualified method name
454
      CharSequence path = headers.path();
1✔
455

456
      if (path == null) {
1✔
457
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
458
            "Expected path but is missing");
459
        return;
1✔
460
      }
461

462
      if (path.charAt(0) != '/') {
1✔
463
        respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
1✔
464
            String.format("Expected path to start with /: %s", path));
1✔
465
        return;
1✔
466
      }
467

468
      String method = path.subSequence(1, path.length()).toString();
1✔
469

470
      // Verify that the Content-Type is correct in the request.
471
      CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
1✔
472
      if (contentType == null) {
1✔
473
        respondWithHttpError(
1✔
474
            ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
475
        return;
1✔
476
      }
477
      String contentTypeString = contentType.toString();
1✔
478
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
479
        respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
1✔
480
            String.format("Content-Type '%s' is not supported", contentTypeString));
1✔
481
        return;
1✔
482
      }
483

484
      if (!HTTP_METHOD.contentEquals(headers.method())) {
1✔
485
        Http2Headers extraHeaders = new DefaultHttp2Headers();
1✔
486
        extraHeaders.add(HttpHeaderNames.ALLOW, HTTP_METHOD);
1✔
487
        respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
1✔
488
            String.format("Method '%s' is not supported", headers.method()), extraHeaders);
1✔
489
        return;
1✔
490
      }
491

492
      int h2HeadersSize = Utils.getH2HeadersSize(headers);
1✔
493
      if (Utils.shouldRejectOnMetadataSizeSoftLimitExceeded(
1✔
494
              h2HeadersSize, softLimitHeaderListSize, maxHeaderListSize)) {
495
        respondWithHttpError(ctx, streamId, 431, Status.Code.RESOURCE_EXHAUSTED, String.format(
×
496
                "Client Headers of size %d exceeded Metadata size soft limit: %d",
497
                h2HeadersSize,
×
498
                softLimitHeaderListSize));
×
499
        return;
×
500
      }
501

502
      if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
1✔
503
        logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
1✔
504
                + "some intermediate proxy may not support trailers",
505
            TE_TRAILERS, headers.get(TE_HEADER)));
1✔
506
        teWarningLogged = true;
1✔
507
      }
508

509
      // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
510
      // method.
511
      Http2Stream http2Stream = requireHttp2Stream(streamId);
1✔
512

513
      Metadata metadata = Utils.convertHeaders(headers);
1✔
514
      StatsTraceContext statsTraceCtx =
1✔
515
          StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
1✔
516

517
      NettyServerStream.TransportState state = new NettyServerStream.TransportState(
1✔
518
          this,
519
          ctx.channel().eventLoop(),
1✔
520
          http2Stream,
521
          maxMessageSize,
522
          statsTraceCtx,
523
          transportTracer,
524
          method);
525

526
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
1✔
527
        PerfMark.attachTag(state.tag());
1✔
528
        String authority = getOrUpdateAuthority((AsciiString) headers.authority());
1✔
529
        NettyServerStream stream = new NettyServerStream(
1✔
530
            ctx.channel(),
1✔
531
            state,
532
            attributes,
533
            authority,
534
            statsTraceCtx);
535
        transportListener.streamCreated(stream, method, metadata);
1✔
536
        state.onStreamAllocated();
1✔
537
        http2Stream.setProperty(streamKey, state);
1✔
538
      }
539
    } catch (Exception e) {
×
540
      logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
×
541
      // Throw an exception that will get handled by onStreamError.
542
      throw newStreamException(streamId, e);
×
543
    }
1✔
544
  }
1✔
545

546
  private String getOrUpdateAuthority(AsciiString authority) {
547
    if (authority == null) {
1✔
548
      return null;
1✔
549
    } else if (!authority.equals(lastKnownAuthority)) {
1✔
550
      lastKnownAuthority = authority;
1✔
551
    }
552

553
    // AsciiString.toString() is internally cached, so subsequent calls will not
554
    // result in recomputing the String representation of lastKnownAuthority.
555
    return lastKnownAuthority.toString();
1✔
556
  }
557

558
  private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
559
      throws Http2Exception {
560
    flowControlPing().onDataRead(data.readableBytes(), padding);
1✔
561
    try {
562
      NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
1✔
563
      if (stream == null) {
1✔
564
        return;
1✔
565
      }
566
      try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
1✔
567
        PerfMark.attachTag(stream.tag());
1✔
568
        stream.inboundDataReceived(data, endOfStream);
1✔
569
      }
570
    } catch (Throwable e) {
×
571
      logger.log(Level.WARNING, "Exception in onDataRead()", e);
×
572
      // Throw an exception that will get handled by onStreamError.
573
      throw newStreamException(streamId, e);
×
574
    }
1✔
575
  }
1✔
576

577
  private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
578
    Http2Exception tooManyRstStream = rstStreamCounter.countRstStream();
1✔
579
    if (tooManyRstStream != null) {
1✔
580
      throw tooManyRstStream;
1✔
581
    }
582

583
    try {
584
      NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
1✔
585
      if (stream != null) {
1✔
586
        try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
1✔
587
          PerfMark.attachTag(stream.tag());
1✔
588
          stream.transportReportStatus(
1✔
589
              Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
1✔
590
        }
591
      }
592
    } catch (Throwable e) {
×
593
      logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
×
594
      // Throw an exception that will get handled by onStreamError.
595
      throw newStreamException(streamId, e);
×
596
    }
1✔
597
  }
1✔
598

599
  @Override
600
  protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
601
      Http2Exception http2Ex) {
602
    logger.log(Level.FINE, "Connection Error", cause);
1✔
603
    connectionError = cause;
1✔
604
    super.onConnectionError(ctx, outbound, cause, http2Ex);
1✔
605
  }
1✔
606

607
  @Override
608
  protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
609
      StreamException http2Ex) {
610
    NettyServerStream.TransportState serverStream = serverStream(
1✔
611
        connection().stream(Http2Exception.streamId(http2Ex)));
1✔
612
    Level level = Level.WARNING;
1✔
613
    if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
1✔
614
      level = Level.FINE;
1✔
615
    }
616
    logger.log(level, "Stream Error", cause);
1✔
617
    Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
1✔
618
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
1✔
619
      PerfMark.attachTag(tag);
1✔
620
      if (serverStream != null) {
1✔
621
        serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
1✔
622
      }
623
      // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
624
      // Delegate to the base class to send a RST_STREAM.
625
      super.onStreamError(ctx, outbound, cause, http2Ex);
1✔
626
    }
627
  }
1✔
628

629
  @Override
630
  public void handleProtocolNegotiationCompleted(
631
      Attributes attrs, InternalChannelz.Security securityInfo) {
632
    negotiationAttributes = attrs;
1✔
633
    this.securityInfo = securityInfo;
1✔
634
    super.handleProtocolNegotiationCompleted(attrs, securityInfo);
1✔
635
    NettyClientHandler.writeBufferingAndRemove(ctx().channel());
1✔
636
  }
1✔
637

638
  @Override
639
  public Attributes getEagAttributes() {
640
    return eagAttributes;
1✔
641
  }
642

643
  InternalChannelz.Security getSecurityInfo() {
644
    return securityInfo;
1✔
645
  }
646

647
  @VisibleForTesting
648
  KeepAliveManager getKeepAliveManagerForTest() {
649
    return keepAliveManager;
1✔
650
  }
651

652
  @VisibleForTesting
653
  void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
654
    this.keepAliveManager = keepAliveManager;
1✔
655
  }
1✔
656

657
  /**
658
   * Handler for the Channel shutting down.
659
   */
660
  @Override
661
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
662
    try {
663
      if (keepAliveManager != null) {
1✔
664
        keepAliveManager.onTransportTermination();
1✔
665
      }
666
      if (maxConnectionIdleManager != null) {
1✔
667
        maxConnectionIdleManager.onTransportTermination();
1✔
668
      }
669
      if (maxConnectionAgeMonitor != null) {
1✔
670
        maxConnectionAgeMonitor.cancel(false);
1✔
671
      }
672
      final Status status =
1✔
673
          Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
1✔
674
      // Any streams that are still active must be closed
675
      connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
676
        @Override
677
        public boolean visit(Http2Stream stream) throws Http2Exception {
678
          NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
679
          if (serverStream != null) {
1✔
680
            serverStream.transportReportStatus(status);
1✔
681
          }
682
          return true;
1✔
683
        }
684
      });
685
    } finally {
686
      super.channelInactive(ctx);
1✔
687
    }
688
  }
1✔
689

690
  WriteQueue getWriteQueue() {
691
    return serverWriteQueue;
1✔
692
  }
693

694
  /** Handler for commands sent from the stream. */
695
  @Override
696
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
697
      throws Exception {
698
    if (msg instanceof SendGrpcFrameCommand) {
1✔
699
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
1✔
700
    } else if (msg instanceof SendResponseHeadersCommand) {
1✔
701
      sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
1✔
702
    } else if (msg instanceof CancelServerStreamCommand) {
1✔
703
      cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
1✔
704
    } else if (msg instanceof GracefulServerCloseCommand) {
1✔
705
      gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
1✔
706
    } else if (msg instanceof ForcefulCloseCommand) {
1✔
707
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
1✔
708
    } else {
709
      AssertionError e =
×
710
          new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
×
711
      ReferenceCountUtil.release(msg);
×
712
      promise.setFailure(e);
×
713
      throw e;
×
714
    }
715
  }
1✔
716

717
  @Override
718
  public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
719
    gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
1✔
720
    ctx.flush();
1✔
721
  }
1✔
722

723
  /**
724
   * Returns the given processed bytes back to inbound flow control.
725
   */
726
  void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
727
    try {
728
      decoder().flowController().consumeBytes(http2Stream, bytes);
1✔
729
    } catch (Http2Exception e) {
×
730
      throw new RuntimeException(e);
×
731
    }
1✔
732
  }
1✔
733

734
  private void closeStreamWhenDone(ChannelPromise promise, Http2Stream stream) {
735
    promise.addListener(
1✔
736
        new ChannelFutureListener() {
1✔
737
          @Override
738
          public void operationComplete(ChannelFuture future) {
739
            serverStream(stream).complete();
1✔
740
          }
1✔
741
        });
742
  }
1✔
743

744
  private static void streamGone(int streamId, ChannelPromise promise) {
745
    promise.setFailure(
1✔
746
        new IllegalStateException(
747
            "attempting to write to stream " + streamId + " that no longer exists") {
1✔
748
          @Override
749
          public synchronized Throwable fillInStackTrace() {
750
            return this;
1✔
751
          }
752
        });
753
  }
1✔
754

755
  /** Sends the given gRPC frame to the client. */
756
  private void sendGrpcFrame(
757
      ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)
758
      throws Http2Exception {
759
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
1✔
760
      PerfMark.attachTag(cmd.stream().tag());
1✔
761
      PerfMark.linkIn(cmd.getLink());
1✔
762
      int streamId = cmd.stream().id();
1✔
763
      Http2Stream stream = connection().stream(streamId);
1✔
764
      if (stream == null) {
1✔
765
        cmd.release();
1✔
766
        streamGone(streamId, promise);
1✔
767
        return;
1✔
768
      }
769
      if (cmd.endStream()) {
1✔
770
        closeStreamWhenDone(promise, stream);
×
771
      }
772
      // Call the base class to write the HTTP/2 DATA frame.
773
      encoder().writeData(ctx, streamId, cmd.content(), 0, cmd.endStream(), promise);
1✔
774
    }
775
  }
1✔
776

777
  /**
778
   * Sends the response headers to the client.
779
   */
780
  private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
781
      ChannelPromise promise) throws Http2Exception {
782
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
1✔
783
      PerfMark.attachTag(cmd.stream().tag());
1✔
784
      PerfMark.linkIn(cmd.getLink());
1✔
785
      int streamId = cmd.stream().id();
1✔
786
      Http2Stream stream = connection().stream(streamId);
1✔
787
      if (stream == null) {
1✔
788
        streamGone(streamId, promise);
1✔
789
        return;
1✔
790
      }
791
      if (cmd.endOfStream()) {
1✔
792
        closeStreamWhenDone(promise, stream);
1✔
793
      }
794
      encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
1✔
795
    }
796
  }
1✔
797

798
  private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
799
      ChannelPromise promise) {
800
    try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
1✔
801
      PerfMark.attachTag(cmd.stream().tag());
1✔
802
      PerfMark.linkIn(cmd.getLink());
1✔
803
      // Notify the listener if we haven't already.
804
      cmd.stream().transportReportStatus(cmd.reason());
1✔
805

806
      // Now we need to decide how we're going to notify the peer that this stream is closed.
807
      // If possible, it's nice to inform the peer _why_ this stream was cancelled by sending
808
      // a structured headers frame.
809
      if (shouldCloseStreamWithHeaders(cmd, connection())) {
1✔
810
        Metadata md = new Metadata();
1✔
811
        md.put(InternalStatus.CODE_KEY, cmd.reason());
1✔
812
        if (cmd.reason().getDescription() != null) {
1✔
813
          md.put(InternalStatus.MESSAGE_KEY, cmd.reason().getDescription());
1✔
814
        }
815
        Http2Headers headers = Utils.convertServerHeaders(md);
1✔
816
        encoder().writeHeaders(
1✔
817
            ctx, cmd.stream().id(), headers, /* padding = */ 0, /* endStream = */ true, promise);
1✔
818
      } else {
1✔
819
        // Terminate the stream.
820
        encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
1✔
821
      }
822
    }
823
  }
1✔
824

825
  // Determine whether a CancelServerStreamCommand should try to close the stream with a
826
  // HEADERS or a RST_STREAM frame. The caller has some influence over this (they can
827
  // configure cmd.wantsHeaders()). The state of the stream also has an influence: we
828
  // only try to send HEADERS if the stream exists and hasn't already sent any headers.
829
  private static boolean shouldCloseStreamWithHeaders(
830
          CancelServerStreamCommand cmd, Http2Connection conn) {
831
    if (!cmd.wantsHeaders()) {
1✔
832
      return false;
1✔
833
    }
834
    Http2Stream stream = conn.stream(cmd.stream().id());
1✔
835
    return stream != null && !stream.isHeadersSent();
1✔
836
  }
837

838
  private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
839
      ChannelPromise promise) throws Exception {
840
    // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
841
    // requested here. But that's an edge case and seems bug-prone.
842
    if (gracefulShutdown == null) {
1✔
843
      Long graceTimeInNanos = null;
1✔
844
      if (msg.getGraceTimeUnit() != null) {
1✔
845
        graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
1✔
846
      }
847
      gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
1✔
848
      gracefulShutdown.start(ctx);
1✔
849
    }
850
    promise.setSuccess();
1✔
851
  }
1✔
852

853
  private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
854
      ChannelPromise promise) throws Exception {
855
    super.close(ctx, promise);
1✔
856
    connection().forEachActiveStream(new Http2StreamVisitor() {
1✔
857
      @Override
858
      public boolean visit(Http2Stream stream) throws Http2Exception {
859
        NettyServerStream.TransportState serverStream = serverStream(stream);
1✔
860
        if (serverStream != null) {
1✔
861
          try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
1✔
862
            PerfMark.attachTag(serverStream.tag());
1✔
863
            PerfMark.linkIn(msg.getLink());
1✔
864
            serverStream.transportReportStatus(msg.getStatus());
1✔
865
            resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
1✔
866
          }
867
        }
868
        stream.close();
1✔
869
        return true;
1✔
870
      }
871
    });
872
  }
1✔
873

874
  private void respondWithHttpError(
875
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
876
    respondWithHttpError(ctx, streamId, code, statusCode, msg, EmptyHttp2Headers.INSTANCE);
1✔
877
  }
1✔
878

879
  private void respondWithHttpError(
880
      ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg,
881
      Http2Headers extraHeaders) {
882
    Metadata metadata = new Metadata();
1✔
883
    metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
884
    metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
885
    byte[][] serialized = InternalMetadata.serialize(metadata);
1✔
886

887
    Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
1✔
888
        .status("" + code)
1✔
889
        .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
1✔
890
    for (int i = 0; i < serialized.length; i += 2) {
1✔
891
      headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
1✔
892
    }
893
    headers.add(extraHeaders);
1✔
894
    encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
1✔
895
    ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
1✔
896
    encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
1✔
897
  }
1✔
898

899
  private Http2Stream requireHttp2Stream(int streamId) {
900
    Http2Stream stream = connection().stream(streamId);
1✔
901
    if (stream == null) {
1✔
902
      // This should never happen.
903
      throw new AssertionError("Stream does not exist: " + streamId);
×
904
    }
905
    return stream;
1✔
906
  }
907

908
  /**
909
   * Returns the server stream associated to the given HTTP/2 stream object.
910
   */
911
  private NettyServerStream.TransportState serverStream(Http2Stream stream) {
912
    return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
1✔
913
  }
914

915
  private Http2Exception newStreamException(int streamId, Throwable cause) {
916
    return Http2Exception.streamError(
×
917
        streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
×
918
  }
919

920
  private class FrameListener extends Http2FrameAdapter {
1✔
921
    private boolean firstSettings = true;
1✔
922

923
    @Override
924
    public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
925
      if (firstSettings) {
1✔
926
        firstSettings = false;
1✔
927
        // Delay transportReady until we see the client's HTTP handshake, for coverage with
928
        // handshakeTimeout
929
        attributes = transportListener.transportReady(negotiationAttributes);
1✔
930
      }
931
    }
1✔
932

933
    @Override
934
    public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
935
        boolean endOfStream) throws Http2Exception {
936
      if (keepAliveManager != null) {
1✔
937
        keepAliveManager.onDataReceived();
1✔
938
      }
939
      NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
1✔
940
      return padding;
1✔
941
    }
942

943
    @Override
944
    public void onHeadersRead(ChannelHandlerContext ctx,
945
        int streamId,
946
        Http2Headers headers,
947
        int streamDependency,
948
        short weight,
949
        boolean exclusive,
950
        int padding,
951
        boolean endStream) throws Http2Exception {
952
      if (keepAliveManager != null) {
1✔
953
        keepAliveManager.onDataReceived();
1✔
954
      }
955
      NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
1✔
956
      if (endStream) {
1✔
957
        NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
×
958
      }
959
    }
1✔
960

961
    @Override
962
    public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
963
        throws Http2Exception {
964
      if (keepAliveManager != null) {
1✔
965
        keepAliveManager.onDataReceived();
1✔
966
      }
967
      NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
1✔
968
    }
1✔
969

970
    @Override
971
    public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
972
      if (keepAliveManager != null) {
1✔
973
        keepAliveManager.onDataReceived();
1✔
974
      }
975
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
976
        ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
1✔
977
        goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
1✔
978
            debugData, ctx.newPromise());
1✔
979
        Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
1✔
980
        try {
981
          forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
1✔
982
        } catch (Exception ex) {
×
983
          onError(ctx, /* outbound= */ true, ex);
×
984
        }
1✔
985
      }
986
    }
1✔
987

988
    @Override
989
    public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
990
      if (keepAliveManager != null) {
1✔
991
        keepAliveManager.onDataReceived();
1✔
992
      }
993
      if (data == flowControlPing().payload()) {
1✔
994
        flowControlPing().updateWindow();
1✔
995
        logger.log(Level.FINE, "Window: {0}",
1✔
996
            decoder().flowController().initialWindowSize(connection().connectionStream()));
1✔
997
      } else if (data == GRACEFUL_SHUTDOWN_PING) {
1✔
998
        if (gracefulShutdown == null) {
1✔
999
          // this should never happen
1000
          logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
×
1001
        } else {
1002
          gracefulShutdown.secondGoAwayAndClose(ctx);
1✔
1003
        }
1004
      } else if (data != KEEPALIVE_PING) {
1✔
1005
        logger.warning("Received unexpected ping ack. No ping outstanding");
×
1006
      }
1007
    }
1✔
1008
  }
1009

1010
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1011
    final ChannelHandlerContext ctx;
1012

1013
    KeepAlivePinger(ChannelHandlerContext ctx) {
1✔
1014
      this.ctx = ctx;
1✔
1015
    }
1✔
1016

1017
    @Override
1018
    public void ping() {
1019
      ChannelFuture pingFuture = encoder().writePing(
1✔
1020
          ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
1✔
1021
      ctx.flush();
1✔
1022
      pingFuture.addListener(new ChannelFutureListener() {
1✔
1023
        @Override
1024
        public void operationComplete(ChannelFuture future) throws Exception {
1025
          if (future.isSuccess()) {
1✔
1026
            transportTracer.reportKeepAliveSent();
1✔
1027
          }
1028
        }
1✔
1029
      });
1030
    }
1✔
1031

1032
    @Override
1033
    public void onPingTimeout() {
1034
      try {
1035
        forcefulClose(
1✔
1036
            ctx,
1037
            new ForcefulCloseCommand(Status.UNAVAILABLE
1038
                .withDescription("Keepalive failed. The connection is likely gone")),
1✔
1039
            ctx.newPromise());
1✔
1040
      } catch (Exception ex) {
×
1041
        try {
1042
          exceptionCaught(ctx, ex);
×
1043
        } catch (Exception ex2) {
×
1044
          logger.log(Level.WARNING, "Exception while propagating exception", ex2);
×
1045
          logger.log(Level.WARNING, "Original failure", ex);
×
1046
        }
×
1047
      }
1✔
1048
    }
1✔
1049
  }
1050

1051
  private final class GracefulShutdown {
1052
    String goAwayMessage;
1053

1054
    /**
1055
     * The grace time between starting graceful shutdown and closing the netty channel,
1056
     * {@code null} is unspecified.
1057
     */
1058
    @CheckForNull
1059
    Long graceTimeInNanos;
1060

1061
    /**
1062
     * True if ping is Acked or ping is timeout.
1063
     */
1064
    boolean pingAckedOrTimeout;
1065

1066
    Future<?> pingFuture;
1067

1068
    GracefulShutdown(String goAwayMessage,
1069
        @Nullable Long graceTimeInNanos) {
1✔
1070
      this.goAwayMessage = goAwayMessage;
1✔
1071
      this.graceTimeInNanos = graceTimeInNanos;
1✔
1072
    }
1✔
1073

1074
    /**
1075
     * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
1076
     */
1077
    void start(final ChannelHandlerContext ctx) {
1078
      goAway(
1✔
1079
          ctx,
1080
          Integer.MAX_VALUE,
1081
          Http2Error.NO_ERROR.code(),
1✔
1082
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1083
          ctx.newPromise());
1✔
1084

1085
      pingFuture = ctx.executor().schedule(
1✔
1086
          new Runnable() {
1✔
1087
            @Override
1088
            public void run() {
1089
              secondGoAwayAndClose(ctx);
1✔
1090
            }
1✔
1091
          },
1092
          GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
1✔
1093
          TimeUnit.NANOSECONDS);
1094

1095
      encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
1✔
1096
    }
1✔
1097

1098
    void secondGoAwayAndClose(ChannelHandlerContext ctx) {
1099
      if (pingAckedOrTimeout) {
1✔
1100
        return;
×
1101
      }
1102
      pingAckedOrTimeout = true;
1✔
1103

1104
      checkNotNull(pingFuture, "pingFuture");
1✔
1105
      pingFuture.cancel(false);
1✔
1106

1107
      // send the second GOAWAY with last stream id
1108
      goAway(
1✔
1109
          ctx,
1110
          connection().remote().lastStreamCreated(),
1✔
1111
          Http2Error.NO_ERROR.code(),
1✔
1112
          ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1✔
1113
          ctx.newPromise());
1✔
1114

1115
      // gracefully shutdown with specified grace time
1116
      long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1✔
1117
      long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1✔
1118
      try {
1119
        gracefulShutdownTimeoutMillis(overriddenGraceTime);
1✔
1120
        NettyServerHandler.super.close(ctx, ctx.newPromise());
1✔
1121
      } catch (Exception e) {
×
1122
        onError(ctx, /* outbound= */ true, e);
×
1123
      } finally {
1124
        gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1✔
1125
      }
1126
    }
1✔
1127

1128
    private long graceTimeOverrideMillis(long originalMillis) {
1129
      if (graceTimeInNanos == null) {
1✔
1130
        return originalMillis;
1✔
1131
      }
1132
      if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1✔
1133
        // netty treats -1 as "no timeout"
1134
        return -1L;
1✔
1135
      }
1136
      return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1✔
1137
    }
1138
  }
1139

1140
  // Use a frame writer so that we know when frames are through flow control and actually being
1141
  // written.
1142
  private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1143
    private final KeepAliveEnforcer keepAliveEnforcer;
1144

1145
    public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1146
        KeepAliveEnforcer keepAliveEnforcer) {
1147
      super(delegate);
1✔
1148
      this.keepAliveEnforcer = keepAliveEnforcer;
1✔
1149
    }
1✔
1150

1151
    @Override
1152
    public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1153
        int padding, boolean endStream, ChannelPromise promise) {
1154
      keepAliveEnforcer.resetCounters();
1✔
1155
      return super.writeData(ctx, streamId, data, padding, endStream, promise);
1✔
1156
    }
1157

1158
    @Override
1159
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1160
        int padding, boolean endStream, ChannelPromise promise) {
1161
      keepAliveEnforcer.resetCounters();
1✔
1162
      return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1✔
1163
    }
1164

1165
    @Override
1166
    public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1167
        int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1168
        ChannelPromise promise) {
1169
      keepAliveEnforcer.resetCounters();
×
1170
      return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
×
1171
          padding, endStream, promise);
1172
    }
1173
  }
1174

1175
  private static final class Http2RstCounterEncoder extends DecoratingHttp2ConnectionEncoder {
1176
    private final RstStreamCounter rstStreamCounter;
1177
    private Http2LifecycleManager lifecycleManager;
1178

1179
    Http2RstCounterEncoder(Http2ConnectionEncoder encoder, RstStreamCounter rstStreamCounter) {
1180
      super(encoder);
1✔
1181
      this.rstStreamCounter = rstStreamCounter;
1✔
1182
    }
1✔
1183

1184
    @Override
1185
    public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
1186
      this.lifecycleManager = lifecycleManager;
1✔
1187
      super.lifecycleManager(lifecycleManager);
1✔
1188
    }
1✔
1189

1190
    @Override
1191
    public ChannelFuture writeRstStream(
1192
        ChannelHandlerContext ctx, int streamId, long errorCode, ChannelPromise promise) {
1193
      ChannelFuture future = super.writeRstStream(ctx, streamId, errorCode, promise);
1✔
1194
      // We want to count "induced" RST_STREAM, where the server sent a reset because of a malformed
1195
      // frame.
1196
      boolean normalRst
1✔
1197
          = errorCode == Http2Error.NO_ERROR.code() || errorCode == Http2Error.CANCEL.code();
1✔
1198
      if (!normalRst) {
1✔
1199
        Http2Exception tooManyRstStream = rstStreamCounter.countRstStream();
1✔
1200
        if (tooManyRstStream != null) {
1✔
1201
          lifecycleManager.onError(ctx, true, tooManyRstStream);
1✔
1202
          ctx.close();
1✔
1203
        }
1204
      }
1205
      return future;
1✔
1206
    }
1207
  }
1208

1209
  private static final class RstStreamCounter {
1210
    private final int maxRstCount;
1211
    private final long maxRstPeriodNanos;
1212
    private final Ticker ticker;
1213
    private int rstCount;
1214
    private long lastRstNanoTime;
1215

1216
    RstStreamCounter(int maxRstCount, long maxRstPeriodNanos, Ticker ticker) {
1✔
1217
      checkArgument(maxRstCount >= 0, "maxRstCount must be non-negative: %s", maxRstCount);
1✔
1218
      this.maxRstCount = maxRstCount;
1✔
1219
      this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
1220
      this.ticker = checkNotNull(ticker, "ticker");
1✔
1221
      this.lastRstNanoTime = ticker.read();
1✔
1222
    }
1✔
1223

1224
    /** Returns non-{@code null} when the connection should be killed by the caller. */
1225
    private Http2Exception countRstStream() {
1226
      if (maxRstCount == 0) {
1✔
1227
        return null;
1✔
1228
      }
1229
      long now = ticker.read();
1✔
1230
      if (now - lastRstNanoTime > maxRstPeriodNanos) {
1✔
1231
        lastRstNanoTime = now;
1✔
1232
        rstCount = 1;
1✔
1233
      } else {
1234
        rstCount++;
1✔
1235
        if (rstCount > maxRstCount) {
1✔
1236
          return new Http2Exception(Http2Error.ENHANCE_YOUR_CALM, "too_many_rststreams") {
1✔
1237
            @SuppressWarnings("UnsynchronizedOverridesSynchronized") // No memory accesses
1238
            @Override
1239
            public Throwable fillInStackTrace() {
1240
              // Avoid the CPU cycles, since the resets may be a CPU consumption attack
1241
              return this;
1✔
1242
            }
1243
          };
1244
        }
1245
      }
1246
      return null;
1✔
1247
    }
1248
  }
1249

1250
  private static class ServerChannelLogger extends ChannelLogger {
1251
    private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1✔
1252

1253
    @Override
1254
    public void log(ChannelLogLevel level, String message) {
1255
      log.log(toJavaLogLevel(level), message);
1✔
1256
    }
1✔
1257

1258
    @Override
1259
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1260
      log(level, MessageFormat.format(messageFormat, args));
1✔
1261
    }
1✔
1262
  }
1263

1264
  private static Level toJavaLogLevel(ChannelLogLevel level) {
1265
    switch (level) {
1✔
1266
      case ERROR:
1267
        return Level.FINE;
×
1268
      case WARNING:
1269
        return Level.FINER;
×
1270
      default:
1271
        return Level.FINEST;
1✔
1272
    }
1273
  }
1274
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc