• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18908

20 Nov 2023 09:54PM CUT coverage: 88.231% (-0.02%) from 88.249%
#18908

push

github

ejona86
netty: Add option to limit RST_STREAM rate

The behavior purposefully mirrors that of Netty's
AbstractHttp2ConnectionHandlerBuilder.decoderEnforceMaxRstFramesPerWindow().
That API is not available to our code as we extend the
Http2ConnectionHandler, but we want our API to be able to delegate to
Netty's in the future if that ever becomes possible.

30311 of 34354 relevant lines covered (88.23%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.89
/../netty/src/main/java/io/grpc/netty/NettyServer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
21
import static io.netty.channel.ChannelOption.ALLOCATOR;
22
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
23

24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.util.concurrent.ListenableFuture;
27
import com.google.common.util.concurrent.SettableFuture;
28
import io.grpc.Attributes;
29
import io.grpc.InternalChannelz;
30
import io.grpc.InternalChannelz.SocketStats;
31
import io.grpc.InternalInstrumented;
32
import io.grpc.InternalLogId;
33
import io.grpc.InternalWithLogId;
34
import io.grpc.ServerStreamTracer;
35
import io.grpc.internal.InternalServer;
36
import io.grpc.internal.ObjectPool;
37
import io.grpc.internal.ServerListener;
38
import io.grpc.internal.ServerTransportListener;
39
import io.grpc.internal.TransportTracer;
40
import io.netty.bootstrap.ServerBootstrap;
41
import io.netty.channel.Channel;
42
import io.netty.channel.ChannelFactory;
43
import io.netty.channel.ChannelFuture;
44
import io.netty.channel.ChannelFutureListener;
45
import io.netty.channel.ChannelInitializer;
46
import io.netty.channel.ChannelOption;
47
import io.netty.channel.ChannelPromise;
48
import io.netty.channel.EventLoop;
49
import io.netty.channel.EventLoopGroup;
50
import io.netty.channel.ServerChannel;
51
import io.netty.channel.group.ChannelGroup;
52
import io.netty.channel.group.ChannelGroupFuture;
53
import io.netty.channel.group.ChannelGroupFutureListener;
54
import io.netty.channel.group.DefaultChannelGroup;
55
import io.netty.util.AbstractReferenceCounted;
56
import io.netty.util.ReferenceCounted;
57
import io.netty.util.concurrent.Future;
58
import io.netty.util.concurrent.GenericFutureListener;
59
import java.io.IOException;
60
import java.net.SocketAddress;
61
import java.util.ArrayList;
62
import java.util.Collections;
63
import java.util.HashMap;
64
import java.util.Iterator;
65
import java.util.List;
66
import java.util.Map;
67
import java.util.concurrent.Callable;
68
import java.util.logging.Level;
69
import java.util.logging.Logger;
70

71
/**
72
 * Netty-based server implementation.
73
 */
74
class NettyServer implements InternalServer, InternalWithLogId {
75
  private static final Logger log = Logger.getLogger(InternalServer.class.getName());
1✔
76

77
  private final InternalLogId logId;
78
  private final List<? extends SocketAddress> addresses;
79
  private final ChannelFactory<? extends ServerChannel> channelFactory;
80
  private final Map<ChannelOption<?>, ?> channelOptions;
81
  private final Map<ChannelOption<?>, ?> childChannelOptions;
82
  private final ProtocolNegotiator protocolNegotiator;
83
  private final int maxStreamsPerConnection;
84
  private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
85
  private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
86
  private final boolean forceHeapBuffer;
87
  private EventLoopGroup bossGroup;
88
  private EventLoopGroup workerGroup;
89
  private ServerListener listener;
90
  private final ChannelGroup channelGroup;
91
  private final boolean autoFlowControl;
92
  private final int flowControlWindow;
93
  private final int maxMessageSize;
94
  private final int maxHeaderListSize;
95
  private final long keepAliveTimeInNanos;
96
  private final long keepAliveTimeoutInNanos;
97
  private final long maxConnectionIdleInNanos;
98
  private final long maxConnectionAgeInNanos;
99
  private final long maxConnectionAgeGraceInNanos;
100
  private final boolean permitKeepAliveWithoutCalls;
101
  private final long permitKeepAliveTimeInNanos;
102
  private final int maxRstCount;
103
  private final long maxRstPeriodNanos;
104
  private final Attributes eagAttributes;
105
  private final ReferenceCounted sharedResourceReferenceCounter =
1✔
106
      new SharedResourceReferenceCounter();
107
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
108
  private final TransportTracer.Factory transportTracerFactory;
109
  private final InternalChannelz channelz;
110
  private volatile List<InternalInstrumented<SocketStats>> listenSocketStatsList =
1✔
111
      Collections.emptyList();
1✔
112
  private volatile boolean terminated;
113
  private final EventLoop bossExecutor;
114

115
  NettyServer(
116
      List<? extends SocketAddress> addresses,
117
      ChannelFactory<? extends ServerChannel> channelFactory,
118
      Map<ChannelOption<?>, ?> channelOptions,
119
      Map<ChannelOption<?>, ?> childChannelOptions,
120
      ObjectPool<? extends EventLoopGroup> bossGroupPool,
121
      ObjectPool<? extends EventLoopGroup> workerGroupPool,
122
      boolean forceHeapBuffer,
123
      ProtocolNegotiator protocolNegotiator,
124
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
125
      TransportTracer.Factory transportTracerFactory,
126
      int maxStreamsPerConnection, boolean autoFlowControl, int flowControlWindow,
127
      int maxMessageSize, int maxHeaderListSize,
128
      long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
129
      long maxConnectionIdleInNanos,
130
      long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
131
      boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
132
      int maxRstCount, long maxRstPeriodNanos,
133
      Attributes eagAttributes, InternalChannelz channelz) {
1✔
134
    this.addresses = checkNotNull(addresses, "addresses");
1✔
135
    this.channelFactory = checkNotNull(channelFactory, "channelFactory");
1✔
136
    checkNotNull(channelOptions, "channelOptions");
1✔
137
    this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
1✔
138
    checkNotNull(childChannelOptions, "childChannelOptions");
1✔
139
    this.childChannelOptions = new HashMap<ChannelOption<?>, Object>(childChannelOptions);
1✔
140
    this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
1✔
141
    this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
1✔
142
    this.forceHeapBuffer = forceHeapBuffer;
1✔
143
    this.bossGroup = bossGroupPool.getObject();
1✔
144
    this.bossExecutor = bossGroup.next();
1✔
145
    this.channelGroup = new DefaultChannelGroup(this.bossExecutor);
1✔
146
    this.workerGroup = workerGroupPool.getObject();
1✔
147
    this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
1✔
148
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
149
    this.transportTracerFactory = transportTracerFactory;
1✔
150
    this.maxStreamsPerConnection = maxStreamsPerConnection;
1✔
151
    this.autoFlowControl = autoFlowControl;
1✔
152
    this.flowControlWindow = flowControlWindow;
1✔
153
    this.maxMessageSize = maxMessageSize;
1✔
154
    this.maxHeaderListSize = maxHeaderListSize;
1✔
155
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
156
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
157
    this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
1✔
158
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
159
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
160
    this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
1✔
161
    this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
1✔
162
    this.maxRstCount = maxRstCount;
1✔
163
    this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
164
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
165
    this.channelz = Preconditions.checkNotNull(channelz);
1✔
166
    this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" :
1✔
167
        String.valueOf(addresses));
1✔
168
  }
1✔
169

170
  @Override
171
  public SocketAddress getListenSocketAddress() {
172
    Iterator<Channel> it = channelGroup.iterator();
1✔
173
    if (it.hasNext()) {
1✔
174
      return it.next().localAddress();
1✔
175
    } else {
176
      // server is not listening/bound yet, just return the original port.
177
      return addresses.isEmpty() ? null : addresses.get(0);
1✔
178
    }
179
  }
180

181
  @Override
182
  public List<SocketAddress> getListenSocketAddresses() {
183
    List<SocketAddress> listenSocketAddresses = new ArrayList<>();
1✔
184
    for (Channel c: channelGroup) {
1✔
185
      listenSocketAddresses.add(c.localAddress());
1✔
186
    }
1✔
187
    // server is not listening/bound yet, just return the original ports.
188
    if (listenSocketAddresses.isEmpty())  {
1✔
189
      listenSocketAddresses.addAll(addresses);
1✔
190
    }
191
    return listenSocketAddresses;
1✔
192
  }
193

194
  @Override
195
  public InternalInstrumented<SocketStats> getListenSocketStats() {
196
    List<InternalInstrumented<SocketStats>> savedListenSocketStatsList = listenSocketStatsList;
1✔
197
    return savedListenSocketStatsList.isEmpty() ? null : savedListenSocketStatsList.get(0);
1✔
198
  }
199

200
  @Override
201
  public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
202
    return listenSocketStatsList;
1✔
203
  }
204

205
  @Override
206
  public void start(ServerListener serverListener) throws IOException {
207
    listener = checkNotNull(serverListener, "serverListener");
1✔
208

209
    final ServerBootstrap b = new ServerBootstrap();
1✔
210
    b.option(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
1✔
211
    b.childOption(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
1✔
212
    b.group(bossExecutor, workerGroup);
1✔
213
    b.channelFactory(channelFactory);
1✔
214
    // For non-socket based channel, the option will be ignored.
215
    b.childOption(SO_KEEPALIVE, true);
1✔
216

217
    if (channelOptions != null) {
1✔
218
      for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
1✔
219
        @SuppressWarnings("unchecked")
220
        ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
×
221
        b.option(key, entry.getValue());
×
222
      }
×
223
    }
224

225
    if (childChannelOptions != null) {
1✔
226
      for (Map.Entry<ChannelOption<?>, ?> entry : childChannelOptions.entrySet()) {
1✔
227
        @SuppressWarnings("unchecked")
228
        ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
1✔
229
        b.childOption(key, entry.getValue());
1✔
230
      }
1✔
231
    }
232

233
    b.childHandler(new ChannelInitializer<Channel>() {
1✔
234
      @Override
235
      public void initChannel(Channel ch) {
236

237
        ChannelPromise channelDone = ch.newPromise();
1✔
238

239
        long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos;
1✔
240
        if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
241
          // apply a random jitter of +/-10% to max connection age
242
          maxConnectionAgeInNanos =
1✔
243
              (long) ((.9D + Math.random() * .2D) * maxConnectionAgeInNanos);
1✔
244
        }
245

246
        NettyServerTransport transport =
1✔
247
            new NettyServerTransport(
248
                ch,
249
                channelDone,
250
                protocolNegotiator,
1✔
251
                streamTracerFactories,
1✔
252
                transportTracerFactory.create(),
1✔
253
                maxStreamsPerConnection,
1✔
254
                autoFlowControl,
1✔
255
                flowControlWindow,
1✔
256
                maxMessageSize,
1✔
257
                maxHeaderListSize,
1✔
258
                keepAliveTimeInNanos,
1✔
259
                keepAliveTimeoutInNanos,
1✔
260
                maxConnectionIdleInNanos,
1✔
261
                maxConnectionAgeInNanos,
262
                maxConnectionAgeGraceInNanos,
1✔
263
                permitKeepAliveWithoutCalls,
1✔
264
                permitKeepAliveTimeInNanos,
1✔
265
                maxRstCount,
1✔
266
                maxRstPeriodNanos,
1✔
267
                eagAttributes);
1✔
268
        ServerTransportListener transportListener;
269
        // This is to order callbacks on the listener, not to guard access to channel.
270
        synchronized (NettyServer.this) {
1✔
271
          if (terminated) {
1✔
272
            // Server already terminated.
273
            ch.close();
1✔
274
            return;
1✔
275
          }
276
          // `channel` shutdown can race with `ch` initialization, so this is only safe to increment
277
          // inside the lock.
278
          sharedResourceReferenceCounter.retain();
1✔
279
          transportListener = listener.transportCreated(transport);
1✔
280
        }
1✔
281

282
        /* Releases the event loop if the channel is "done", possibly due to the channel closing. */
283
        final class LoopReleaser implements ChannelFutureListener {
1✔
284
          private boolean done;
285

286
          @Override
287
          public void operationComplete(ChannelFuture future) throws Exception {
288
            if (!done) {
1✔
289
              done = true;
1✔
290
              sharedResourceReferenceCounter.release();
1✔
291
            }
292
          }
1✔
293
        }
294

295
        transport.start(transportListener);
1✔
296
        ChannelFutureListener loopReleaser = new LoopReleaser();
1✔
297
        channelDone.addListener(loopReleaser);
1✔
298
        ch.closeFuture().addListener(loopReleaser);
1✔
299
      }
1✔
300
    });
301
    Future<Map<ChannelFuture, SocketAddress>> bindCallFuture =
1✔
302
        bossExecutor.submit(
1✔
303
            new Callable<Map<ChannelFuture, SocketAddress>>() {
1✔
304
          @Override
305
          public Map<ChannelFuture, SocketAddress> call() {
306
            Map<ChannelFuture, SocketAddress> bindFutures = new HashMap<>();
1✔
307
            for (SocketAddress address: addresses) {
1✔
308
                ChannelFuture future = b.bind(address);
1✔
309
                channelGroup.add(future.channel());
1✔
310
                bindFutures.put(future, address);
1✔
311
            }
1✔
312
            return bindFutures;
1✔
313
          }
314
        }
315
    );
316
    Map<ChannelFuture, SocketAddress> channelFutures =
1✔
317
        bindCallFuture.awaitUninterruptibly().getNow();
1✔
318

319
    if (!bindCallFuture.isSuccess()) {
1✔
320
      channelGroup.close().awaitUninterruptibly();
1✔
321
      throw new IOException(String.format("Failed to bind to addresses %s",
1✔
322
          addresses), bindCallFuture.cause());
1✔
323
    }
324
    final List<InternalInstrumented<SocketStats>> socketStats = new ArrayList<>();
1✔
325
    for (Map.Entry<ChannelFuture, SocketAddress> entry: channelFutures.entrySet()) {
1✔
326
      // We'd love to observe interruption, but if interrupted we will need to close the channel,
327
      // which itself would need an await() to guarantee the port is not used when the method
328
      // returns. See #6850
329
      final ChannelFuture future = entry.getKey();
1✔
330
      if (!future.awaitUninterruptibly().isSuccess()) {
1✔
331
        channelGroup.close().awaitUninterruptibly();
1✔
332
        throw new IOException(String.format("Failed to bind to address %s",
1✔
333
            entry.getValue()), future.cause());
1✔
334
      }
335
      final InternalInstrumented<SocketStats> listenSocketStats =
1✔
336
          new ListenSocket(future.channel());
1✔
337
      channelz.addListenSocket(listenSocketStats);
1✔
338
      socketStats.add(listenSocketStats);
1✔
339
      future.channel().closeFuture().addListener(new ChannelFutureListener() {
1✔
340
        @Override
341
        public void operationComplete(ChannelFuture future) throws Exception {
342
          channelz.removeListenSocket(listenSocketStats);
1✔
343
        }
1✔
344
      });
345
    }
1✔
346
    listenSocketStatsList = Collections.unmodifiableList(socketStats);
1✔
347
  }
1✔
348

349
  @Override
350
  public void shutdown() {
351
    if (terminated) {
1✔
352
      return;
×
353
    }
354
    ChannelGroupFuture groupFuture = channelGroup.close()
1✔
355
        .addListener(new ChannelGroupFutureListener() {
1✔
356
            @Override
357
            public void operationComplete(ChannelGroupFuture future) throws Exception {
358
              if (!future.isSuccess()) {
1✔
359
                log.log(Level.WARNING, "Error closing server channel group", future.cause());
×
360
              }
361
              sharedResourceReferenceCounter.release();
1✔
362
              protocolNegotiator.close();
1✔
363
              listenSocketStatsList = Collections.emptyList();
1✔
364
              synchronized (NettyServer.this) {
1✔
365
                listener.serverShutdown();
1✔
366
                terminated = true;
1✔
367
              }
1✔
368
            }
1✔
369
        });
370
    try {
371
      groupFuture.await();
1✔
372
    } catch (InterruptedException e) {
×
373
      log.log(Level.FINE, "Interrupted while shutting down", e);
×
374
      Thread.currentThread().interrupt();
×
375
    }
1✔
376
  }
1✔
377

378
  @Override
379
  public InternalLogId getLogId() {
380
    return logId;
×
381
  }
382

383
  @Override
384
  public String toString() {
385
    return MoreObjects.toStringHelper(this)
×
386
        .add("logId", logId.getId())
×
387
        .add("addresses", addresses)
×
388
        .toString();
×
389
  }
390

391
  class SharedResourceReferenceCounter extends AbstractReferenceCounted {
1✔
392
    @Override
393
    protected void deallocate() {
394
      try {
395
        if (bossGroup != null) {
1✔
396
          bossGroupPool.returnObject(bossGroup);
1✔
397
        }
398
      } finally {
399
        bossGroup = null;
1✔
400
        try {
401
          if (workerGroup != null) {
1✔
402
            workerGroupPool.returnObject(workerGroup);
1✔
403
          }
404
        } finally {
405
          workerGroup = null;
1✔
406
        }
407
      }
408
    }
1✔
409

410
    @Override
411
    public ReferenceCounted touch(Object hint) {
412
      return this;
×
413
    }
414
  }
415

416
  /**
417
   * A class that can answer channelz queries about the server listen sockets.
418
   */
419
  private static final class ListenSocket implements InternalInstrumented<SocketStats> {
420
    private final InternalLogId id;
421
    private final Channel ch;
422

423
    ListenSocket(Channel ch) {
1✔
424
      this.ch = ch;
1✔
425
      this.id = InternalLogId.allocate(getClass(), String.valueOf(ch.localAddress()));
1✔
426
    }
1✔
427

428
    @Override
429
    public ListenableFuture<SocketStats> getStats() {
430
      final SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
431
      if (ch.eventLoop().inEventLoop()) {
1✔
432
        // This is necessary, otherwise we will block forever if we get the future from inside
433
        // the event loop.
434
        ret.set(new SocketStats(
×
435
            /*data=*/ null,
436
            ch.localAddress(),
×
437
            /*remote=*/ null,
438
            Utils.getSocketOptions(ch),
×
439
            /*security=*/ null));
440
        return ret;
×
441
      }
442
      ch.eventLoop()
1✔
443
          .submit(
1✔
444
              new Runnable() {
1✔
445
                @Override
446
                public void run() {
447
                  ret.set(new SocketStats(
1✔
448
                      /*data=*/ null,
449
                      ch.localAddress(),
1✔
450
                      /*remote=*/ null,
451
                      Utils.getSocketOptions(ch),
1✔
452
                      /*security=*/ null));
453
                }
1✔
454
              })
455
          .addListener(
1✔
456
              new GenericFutureListener<Future<Object>>() {
1✔
457
                @Override
458
                public void operationComplete(Future<Object> future) throws Exception {
459
                  if (!future.isSuccess()) {
1✔
460
                    ret.setException(future.cause());
×
461
                  }
462
                }
1✔
463
              });
464
      return ret;
1✔
465
    }
466

467
    @Override
468
    public InternalLogId getLogId() {
469
      return id;
1✔
470
    }
471

472
    @Override
473
    public String toString() {
474
      return MoreObjects.toStringHelper(this)
×
475
          .add("logId", id.getId())
×
476
          .add("channel", ch)
×
477
          .toString();
×
478
    }
479
  }
480
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc