• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18908

20 Nov 2023 09:54PM CUT coverage: 88.231% (-0.02%) from 88.249%
#18908

push

github

ejona86
netty: Add option to limit RST_STREAM rate

The behavior purposefully mirrors that of Netty's
AbstractHttp2ConnectionHandlerBuilder.decoderEnforceMaxRstFramesPerWindow().
That API is not available to our code as we extend the
Http2ConnectionHandler, but we want our API to be able to delegate to
Netty's in the future if that ever becomes possible.

30311 of 34354 relevant lines covered (88.23%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.13
/../netty/src/main/java/io/grpc/netty/NettyServerTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.base.MoreObjects;
21
import com.google.common.base.Preconditions;
22
import com.google.common.collect.ImmutableList;
23
import com.google.common.util.concurrent.ListenableFuture;
24
import com.google.common.util.concurrent.SettableFuture;
25
import io.grpc.Attributes;
26
import io.grpc.InternalChannelz.SocketStats;
27
import io.grpc.InternalLogId;
28
import io.grpc.ServerStreamTracer;
29
import io.grpc.Status;
30
import io.grpc.internal.ServerTransport;
31
import io.grpc.internal.ServerTransportListener;
32
import io.grpc.internal.TransportTracer;
33
import io.netty.channel.Channel;
34
import io.netty.channel.ChannelFuture;
35
import io.netty.channel.ChannelFutureListener;
36
import io.netty.channel.ChannelHandler;
37
import io.netty.channel.ChannelPromise;
38
import io.netty.util.concurrent.Future;
39
import io.netty.util.concurrent.GenericFutureListener;
40
import java.io.IOException;
41
import java.net.SocketAddress;
42
import java.net.SocketException;
43
import java.util.List;
44
import java.util.concurrent.ScheduledExecutorService;
45
import java.util.logging.Level;
46
import java.util.logging.Logger;
47

48
/**
49
 * The Netty-based server transport.
50
 */
51
class NettyServerTransport implements ServerTransport {
52
  // connectionLog is for connection related messages only
53
  private static final Logger connectionLog = Logger.getLogger(
1✔
54
      String.format("%s.connections", NettyServerTransport.class.getName()));
1✔
55

56
  // Some exceptions are not very useful and add too much noise to the log
57
  private static final ImmutableList<String> QUIET_EXCEPTIONS = ImmutableList.of(
1✔
58
      "NativeIoException" /* Netty exceptions */);
59

60
  private final InternalLogId logId;
61
  private final Channel channel;
62
  private final ChannelPromise channelUnused;
63
  private final ProtocolNegotiator protocolNegotiator;
64
  private final int maxStreams;
65
  // only accessed from channel event loop
66
  private NettyServerHandler grpcHandler;
67
  private ServerTransportListener listener;
68
  private boolean terminated;
69
  private final boolean autoFlowControl;
70
  private final int flowControlWindow;
71
  private final int maxMessageSize;
72
  private final int maxHeaderListSize;
73
  private final long keepAliveTimeInNanos;
74
  private final long keepAliveTimeoutInNanos;
75
  private final long maxConnectionIdleInNanos;
76
  private final long maxConnectionAgeInNanos;
77
  private final long maxConnectionAgeGraceInNanos;
78
  private final boolean permitKeepAliveWithoutCalls;
79
  private final long permitKeepAliveTimeInNanos;
80
  private final int maxRstCount;
81
  private final long maxRstPeriodNanos;
82
  private final Attributes eagAttributes;
83
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
84
  private final TransportTracer transportTracer;
85

86
  NettyServerTransport(
87
      Channel channel,
88
      ChannelPromise channelUnused,
89
      ProtocolNegotiator protocolNegotiator,
90
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
91
      TransportTracer transportTracer,
92
      int maxStreams,
93
      boolean autoFlowControl,
94
      int flowControlWindow,
95
      int maxMessageSize,
96
      int maxHeaderListSize,
97
      long keepAliveTimeInNanos,
98
      long keepAliveTimeoutInNanos,
99
      long maxConnectionIdleInNanos,
100
      long maxConnectionAgeInNanos,
101
      long maxConnectionAgeGraceInNanos,
102
      boolean permitKeepAliveWithoutCalls,
103
      long permitKeepAliveTimeInNanos,
104
      int maxRstCount,
105
      long maxRstPeriodNanos,
106
      Attributes eagAttributes) {
1✔
107
    this.channel = Preconditions.checkNotNull(channel, "channel");
1✔
108
    this.channelUnused = channelUnused;
1✔
109
    this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
1✔
110
    this.streamTracerFactories =
1✔
111
        Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
112
    this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
1✔
113
    this.maxStreams = maxStreams;
1✔
114
    this.autoFlowControl = autoFlowControl;
1✔
115
    this.flowControlWindow = flowControlWindow;
1✔
116
    this.maxMessageSize = maxMessageSize;
1✔
117
    this.maxHeaderListSize = maxHeaderListSize;
1✔
118
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
119
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
120
    this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
1✔
121
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
122
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
123
    this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
1✔
124
    this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
1✔
125
    this.maxRstCount = maxRstCount;
1✔
126
    this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
127
    this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
128
    SocketAddress remote = channel.remoteAddress();
1✔
129
    this.logId = InternalLogId.allocate(getClass(), remote != null ? remote.toString() : null);
1✔
130
  }
1✔
131

132
  public void start(ServerTransportListener listener) {
133
    Preconditions.checkState(this.listener == null, "Handler already registered");
1✔
134
    this.listener = listener;
1✔
135

136
    // Create the Netty handler for the pipeline.
137
    grpcHandler = createHandler(listener, channelUnused);
1✔
138

139
    // Notify when the channel closes.
140
    final class TerminationNotifier implements ChannelFutureListener {
1✔
141
      boolean done;
142

143
      @Override
144
      public void operationComplete(ChannelFuture future) throws Exception {
145
        if (!done) {
1✔
146
          done = true;
1✔
147
          notifyTerminated(grpcHandler.connectionError());
1✔
148
        }
149
      }
1✔
150
    }
151

152
    ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
1✔
153
    ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler);
1✔
154

155
    ChannelFutureListener terminationNotifier = new TerminationNotifier();
1✔
156
    channelUnused.addListener(terminationNotifier);
1✔
157
    channel.closeFuture().addListener(terminationNotifier);
1✔
158

159
    channel.pipeline().addLast(bufferingHandler);
1✔
160
  }
1✔
161

162
  @Override
163
  public ScheduledExecutorService getScheduledExecutorService() {
164
    return channel.eventLoop();
1✔
165
  }
166

167
  @Override
168
  public void shutdown() {
169
    if (channel.isOpen()) {
1✔
170
      channel.close();
1✔
171
    }
172
  }
1✔
173

174
  @Override
175
  public void shutdownNow(Status reason) {
176
    if (channel.isOpen()) {
1✔
177
      channel.writeAndFlush(new ForcefulCloseCommand(reason));
1✔
178
    }
179
  }
1✔
180

181
  @Override
182
  public InternalLogId getLogId() {
183
    return logId;
1✔
184
  }
185

186
  /**
187
   * For testing purposes only.
188
   */
189
  Channel channel() {
190
    return channel;
1✔
191
  }
192

193
  /**
194
   * Accepts a throwable and returns the appropriate logging level. Uninteresting exceptions
195
   * should not clutter the log.
196
   */
197
  @VisibleForTesting
198
  static Level getLogLevel(Throwable t) {
199
    if (t.getClass().equals(IOException.class)
1✔
200
        || t.getClass().equals(SocketException.class)
1✔
201
        || QUIET_EXCEPTIONS.contains(t.getClass().getSimpleName())) {
1✔
202
      return Level.FINE;
1✔
203
    }
204
    return Level.INFO;
1✔
205
  }
206

207
  private void notifyTerminated(Throwable t) {
208
    if (t != null) {
1✔
209
      connectionLog.log(getLogLevel(t), "Transport failed", t);
1✔
210
    }
211
    if (!terminated) {
1✔
212
      terminated = true;
1✔
213
      listener.transportTerminated();
1✔
214
    }
215
  }
1✔
216

217
  @Override
218
  public ListenableFuture<SocketStats> getStats() {
219
    final SettableFuture<SocketStats> result = SettableFuture.create();
1✔
220
    if (channel.eventLoop().inEventLoop()) {
1✔
221
      // This is necessary, otherwise we will block forever if we get the future from inside
222
      // the event loop.
223
      result.set(getStatsHelper(channel));
×
224
      return result;
×
225
    }
226
    channel.eventLoop().submit(
1✔
227
        new Runnable() {
1✔
228
          @Override
229
          public void run() {
230
            result.set(getStatsHelper(channel));
1✔
231
          }
1✔
232
        })
233
        .addListener(
1✔
234
            new GenericFutureListener<Future<Object>>() {
1✔
235
              @Override
236
              public void operationComplete(Future<Object> future) throws Exception {
237
                if (!future.isSuccess()) {
1✔
238
                  result.setException(future.cause());
×
239
                }
240
              }
1✔
241
            });
242
    return result;
1✔
243
  }
244

245
  private SocketStats getStatsHelper(Channel ch) {
246
    Preconditions.checkState(ch.eventLoop().inEventLoop());
1✔
247
    return new SocketStats(
1✔
248
        transportTracer.getStats(),
1✔
249
        channel.localAddress(),
1✔
250
        channel.remoteAddress(),
1✔
251
        Utils.getSocketOptions(ch),
1✔
252
        grpcHandler == null ? null : grpcHandler.getSecurityInfo());
1✔
253

254
  }
255

256
  @Override
257
  public String toString() {
258
    return MoreObjects.toStringHelper(this)
×
259
        .add("logId", logId.getId())
×
260
        .add("channel", channel)
×
261
        .toString();
×
262
  }
263

264
  /**
265
   * Creates the Netty handler to be used in the channel pipeline.
266
   */
267
  private NettyServerHandler createHandler(
268
      ServerTransportListener transportListener, ChannelPromise channelUnused) {
269
    return NettyServerHandler.newHandler(
1✔
270
        transportListener,
271
        channelUnused,
272
        streamTracerFactories,
273
        transportTracer,
274
        maxStreams,
275
        autoFlowControl,
276
        flowControlWindow,
277
        maxHeaderListSize,
278
        maxMessageSize,
279
        keepAliveTimeInNanos,
280
        keepAliveTimeoutInNanos,
281
        maxConnectionIdleInNanos,
282
        maxConnectionAgeInNanos,
283
        maxConnectionAgeGraceInNanos,
284
        permitKeepAliveWithoutCalls,
285
        permitKeepAliveTimeInNanos,
286
        maxRstCount,
287
        maxRstPeriodNanos,
288
        eagAttributes);
289
  }
290
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc