• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19696

14 Feb 2025 11:47PM CUT coverage: 88.593% (-0.002%) from 88.595%
#19696

push

github

web-flow
s2a: Don't allow S2AStub to be set

S2AStub is an internal API and shouldn't be used outside of s2a. It is
still available for tests.

IntegrationTest was moved to io.grpc.s2a. It uses a io.grpc.s2a class,
so shouldn't be in internal.handler

34257 of 38668 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.04
/../netty/src/main/java/io/grpc/netty/NettyServer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
21
import static io.netty.channel.ChannelOption.ALLOCATOR;
22
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
23

24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.util.concurrent.ListenableFuture;
27
import com.google.common.util.concurrent.SettableFuture;
28
import io.grpc.Attributes;
29
import io.grpc.InternalChannelz;
30
import io.grpc.InternalChannelz.SocketStats;
31
import io.grpc.InternalInstrumented;
32
import io.grpc.InternalLogId;
33
import io.grpc.InternalWithLogId;
34
import io.grpc.ServerStreamTracer;
35
import io.grpc.internal.InternalServer;
36
import io.grpc.internal.ObjectPool;
37
import io.grpc.internal.ServerListener;
38
import io.grpc.internal.ServerTransportListener;
39
import io.grpc.internal.TransportTracer;
40
import io.netty.bootstrap.ServerBootstrap;
41
import io.netty.channel.Channel;
42
import io.netty.channel.ChannelFactory;
43
import io.netty.channel.ChannelFuture;
44
import io.netty.channel.ChannelFutureListener;
45
import io.netty.channel.ChannelInitializer;
46
import io.netty.channel.ChannelOption;
47
import io.netty.channel.ChannelPromise;
48
import io.netty.channel.EventLoop;
49
import io.netty.channel.EventLoopGroup;
50
import io.netty.channel.ServerChannel;
51
import io.netty.channel.group.ChannelGroup;
52
import io.netty.channel.group.ChannelGroupFuture;
53
import io.netty.channel.group.ChannelGroupFutureListener;
54
import io.netty.channel.group.DefaultChannelGroup;
55
import io.netty.util.AbstractReferenceCounted;
56
import io.netty.util.ReferenceCounted;
57
import io.netty.util.concurrent.Future;
58
import io.netty.util.concurrent.GenericFutureListener;
59
import java.io.IOException;
60
import java.net.SocketAddress;
61
import java.util.ArrayList;
62
import java.util.Collections;
63
import java.util.HashMap;
64
import java.util.Iterator;
65
import java.util.List;
66
import java.util.Map;
67
import java.util.concurrent.Callable;
68
import java.util.logging.Level;
69
import java.util.logging.Logger;
70

71
/**
72
 * Netty-based server implementation.
73
 */
74
class NettyServer implements InternalServer, InternalWithLogId {
75
  private static final Logger log = Logger.getLogger(InternalServer.class.getName());
1✔
76

77
  private final InternalLogId logId;
78
  private final List<? extends SocketAddress> addresses;
79
  private final ChannelFactory<? extends ServerChannel> channelFactory;
80
  private final Map<ChannelOption<?>, ?> channelOptions;
81
  private final Map<ChannelOption<?>, ?> childChannelOptions;
82
  private final ProtocolNegotiator protocolNegotiator;
83
  private final int maxStreamsPerConnection;
84
  private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
85
  private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
86
  private final boolean forceHeapBuffer;
87
  private EventLoopGroup bossGroup;
88
  private EventLoopGroup workerGroup;
89
  private ServerListener listener;
90
  private final ChannelGroup channelGroup;
91
  private final boolean autoFlowControl;
92
  private final int flowControlWindow;
93
  private final int maxMessageSize;
94
  private final int maxHeaderListSize;
95
  private final int softLimitHeaderListSize;
96
  private final long keepAliveTimeInNanos;
97
  private final long keepAliveTimeoutInNanos;
98
  private final long maxConnectionIdleInNanos;
99
  private final long maxConnectionAgeInNanos;
100
  private final long maxConnectionAgeGraceInNanos;
101
  private final boolean permitKeepAliveWithoutCalls;
102
  private final long permitKeepAliveTimeInNanos;
103
  private final int maxRstCount;
104
  private final long maxRstPeriodNanos;
105
  private final Attributes eagAttributes;
106
  private final ReferenceCounted sharedResourceReferenceCounter =
1✔
107
      new SharedResourceReferenceCounter();
108
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
109
  private final TransportTracer.Factory transportTracerFactory;
110
  private final InternalChannelz channelz;
111
  private volatile List<InternalInstrumented<SocketStats>> listenSocketStatsList =
1✔
112
      Collections.emptyList();
1✔
113
  private volatile boolean terminated;
114
  private final EventLoop bossExecutor;
115

116
  NettyServer(
117
      List<? extends SocketAddress> addresses,
118
      ChannelFactory<? extends ServerChannel> channelFactory,
119
      Map<ChannelOption<?>, ?> channelOptions,
120
      Map<ChannelOption<?>, ?> childChannelOptions,
121
      ObjectPool<? extends EventLoopGroup> bossGroupPool,
122
      ObjectPool<? extends EventLoopGroup> workerGroupPool,
123
      boolean forceHeapBuffer,
124
      ProtocolNegotiator protocolNegotiator,
125
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
126
      TransportTracer.Factory transportTracerFactory,
127
      int maxStreamsPerConnection,
128
      boolean autoFlowControl,
129
      int flowControlWindow,
130
      int maxMessageSize,
131
      int maxHeaderListSize,
132
      int softLimitHeaderListSize,
133
      long keepAliveTimeInNanos,
134
      long keepAliveTimeoutInNanos,
135
      long maxConnectionIdleInNanos,
136
      long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
137
      boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
138
      int maxRstCount, long maxRstPeriodNanos,
139
      Attributes eagAttributes, InternalChannelz channelz) {
1✔
140
    this.addresses = checkNotNull(addresses, "addresses");
1✔
141
    this.channelFactory = checkNotNull(channelFactory, "channelFactory");
1✔
142
    checkNotNull(channelOptions, "channelOptions");
1✔
143
    this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
1✔
144
    checkNotNull(childChannelOptions, "childChannelOptions");
1✔
145
    this.childChannelOptions = new HashMap<ChannelOption<?>, Object>(childChannelOptions);
1✔
146
    this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
1✔
147
    this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
1✔
148
    this.forceHeapBuffer = forceHeapBuffer;
1✔
149
    this.bossGroup = bossGroupPool.getObject();
1✔
150
    this.bossExecutor = bossGroup.next();
1✔
151
    this.channelGroup = new DefaultChannelGroup(this.bossExecutor);
1✔
152
    this.workerGroup = workerGroupPool.getObject();
1✔
153
    this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
1✔
154
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
155
    this.transportTracerFactory = transportTracerFactory;
1✔
156
    this.maxStreamsPerConnection = maxStreamsPerConnection;
1✔
157
    this.autoFlowControl = autoFlowControl;
1✔
158
    this.flowControlWindow = flowControlWindow;
1✔
159
    this.maxMessageSize = maxMessageSize;
1✔
160
    this.maxHeaderListSize = maxHeaderListSize;
1✔
161
    this.softLimitHeaderListSize = softLimitHeaderListSize;
1✔
162
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
163
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
164
    this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
1✔
165
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
166
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
167
    this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
1✔
168
    this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
1✔
169
    this.maxRstCount = maxRstCount;
1✔
170
    this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
171
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
172
    this.channelz = Preconditions.checkNotNull(channelz);
1✔
173
    this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" :
1✔
174
        String.valueOf(addresses));
1✔
175
  }
1✔
176

177
  @Override
178
  public SocketAddress getListenSocketAddress() {
179
    Iterator<Channel> it = channelGroup.iterator();
1✔
180
    if (it.hasNext()) {
1✔
181
      return it.next().localAddress();
1✔
182
    } else {
183
      // server is not listening/bound yet, just return the original port.
184
      return addresses.isEmpty() ? null : addresses.get(0);
1✔
185
    }
186
  }
187

188
  @Override
189
  public List<SocketAddress> getListenSocketAddresses() {
190
    List<SocketAddress> listenSocketAddresses = new ArrayList<>();
1✔
191
    for (Channel c: channelGroup) {
1✔
192
      listenSocketAddresses.add(c.localAddress());
1✔
193
    }
1✔
194
    // server is not listening/bound yet, just return the original ports.
195
    if (listenSocketAddresses.isEmpty())  {
1✔
196
      listenSocketAddresses.addAll(addresses);
1✔
197
    }
198
    return listenSocketAddresses;
1✔
199
  }
200

201
  @Override
202
  public InternalInstrumented<SocketStats> getListenSocketStats() {
203
    List<InternalInstrumented<SocketStats>> savedListenSocketStatsList = listenSocketStatsList;
1✔
204
    return savedListenSocketStatsList.isEmpty() ? null : savedListenSocketStatsList.get(0);
1✔
205
  }
206

207
  @Override
208
  public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
209
    return listenSocketStatsList;
1✔
210
  }
211

212
  @Override
213
  public void start(ServerListener serverListener) throws IOException {
214
    listener = checkNotNull(serverListener, "serverListener");
1✔
215

216
    final ServerBootstrap b = new ServerBootstrap();
1✔
217
    b.option(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
1✔
218
    b.childOption(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
1✔
219
    b.group(bossExecutor, workerGroup);
1✔
220
    b.channelFactory(channelFactory);
1✔
221
    // For non-socket based channel, the option will be ignored.
222
    b.childOption(SO_KEEPALIVE, true);
1✔
223

224
    if (channelOptions != null) {
1✔
225
      for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
1✔
226
        @SuppressWarnings("unchecked")
227
        ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
×
228
        b.option(key, entry.getValue());
×
229
      }
×
230
    }
231

232
    if (childChannelOptions != null) {
1✔
233
      for (Map.Entry<ChannelOption<?>, ?> entry : childChannelOptions.entrySet()) {
1✔
234
        @SuppressWarnings("unchecked")
235
        ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
1✔
236
        b.childOption(key, entry.getValue());
1✔
237
      }
1✔
238
    }
239

240
    b.childHandler(new ChannelInitializer<Channel>() {
1✔
241
      @Override
242
      public void initChannel(Channel ch) {
243

244
        ChannelPromise channelDone = ch.newPromise();
1✔
245

246
        long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos;
1✔
247
        if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
248
          // apply a random jitter of +/-10% to max connection age
249
          maxConnectionAgeInNanos =
1✔
250
              (long) ((.9D + Math.random() * .2D) * maxConnectionAgeInNanos);
1✔
251
        }
252

253
            NettyServerTransport transport =
1✔
254
                new NettyServerTransport(
255
                    ch,
256
                    channelDone,
257
                    protocolNegotiator,
1✔
258
                    streamTracerFactories,
1✔
259
                    transportTracerFactory.create(),
1✔
260
                    maxStreamsPerConnection,
1✔
261
                    autoFlowControl,
1✔
262
                    flowControlWindow,
1✔
263
                    maxMessageSize,
1✔
264
                    maxHeaderListSize,
1✔
265
                    softLimitHeaderListSize,
1✔
266
                    keepAliveTimeInNanos,
1✔
267
                    keepAliveTimeoutInNanos,
1✔
268
                    maxConnectionIdleInNanos,
1✔
269
                    maxConnectionAgeInNanos,
270
                    maxConnectionAgeGraceInNanos,
1✔
271
                    permitKeepAliveWithoutCalls,
1✔
272
                    permitKeepAliveTimeInNanos,
1✔
273
                    maxRstCount,
1✔
274
                    maxRstPeriodNanos,
1✔
275
                    eagAttributes);
1✔
276
        ServerTransportListener transportListener;
277
        // This is to order callbacks on the listener, not to guard access to channel.
278
        synchronized (NettyServer.this) {
1✔
279
          if (terminated) {
1✔
280
            // Server already terminated.
281
            ch.close();
×
282
            return;
×
283
          }
284
          // `channel` shutdown can race with `ch` initialization, so this is only safe to increment
285
          // inside the lock.
286
          sharedResourceReferenceCounter.retain();
1✔
287
          transportListener = listener.transportCreated(transport);
1✔
288
        }
1✔
289

290
        /* Releases the event loop if the channel is "done", possibly due to the channel closing. */
291
        final class LoopReleaser implements ChannelFutureListener {
1✔
292
          private boolean done;
293

294
          @Override
295
          public void operationComplete(ChannelFuture future) throws Exception {
296
            if (!done) {
1✔
297
              done = true;
1✔
298
              sharedResourceReferenceCounter.release();
1✔
299
            }
300
          }
1✔
301
        }
302

303
        transport.start(transportListener);
1✔
304
        ChannelFutureListener loopReleaser = new LoopReleaser();
1✔
305
        channelDone.addListener(loopReleaser);
1✔
306
        ch.closeFuture().addListener(loopReleaser);
1✔
307
      }
1✔
308
    });
309
    Future<Map<ChannelFuture, SocketAddress>> bindCallFuture =
1✔
310
        bossExecutor.submit(
1✔
311
            new Callable<Map<ChannelFuture, SocketAddress>>() {
1✔
312
          @Override
313
          public Map<ChannelFuture, SocketAddress> call() {
314
            Map<ChannelFuture, SocketAddress> bindFutures = new HashMap<>();
1✔
315
            for (SocketAddress address: addresses) {
1✔
316
                ChannelFuture future = b.bind(address);
1✔
317
                channelGroup.add(future.channel());
1✔
318
                bindFutures.put(future, address);
1✔
319
            }
1✔
320
            return bindFutures;
1✔
321
          }
322
        }
323
    );
324
    Map<ChannelFuture, SocketAddress> channelFutures =
1✔
325
        bindCallFuture.awaitUninterruptibly().getNow();
1✔
326

327
    if (!bindCallFuture.isSuccess()) {
1✔
328
      channelGroup.close().awaitUninterruptibly();
1✔
329
      throw new IOException(String.format("Failed to bind to addresses %s",
1✔
330
          addresses), bindCallFuture.cause());
1✔
331
    }
332
    final List<InternalInstrumented<SocketStats>> socketStats = new ArrayList<>();
1✔
333
    for (Map.Entry<ChannelFuture, SocketAddress> entry: channelFutures.entrySet()) {
1✔
334
      // We'd love to observe interruption, but if interrupted we will need to close the channel,
335
      // which itself would need an await() to guarantee the port is not used when the method
336
      // returns. See #6850
337
      final ChannelFuture future = entry.getKey();
1✔
338
      if (!future.awaitUninterruptibly().isSuccess()) {
1✔
339
        channelGroup.close().awaitUninterruptibly();
1✔
340
        throw new IOException(String.format("Failed to bind to address %s",
1✔
341
            entry.getValue()), future.cause());
1✔
342
      }
343
      final InternalInstrumented<SocketStats> listenSocketStats =
1✔
344
          new ListenSocket(future.channel());
1✔
345
      channelz.addListenSocket(listenSocketStats);
1✔
346
      socketStats.add(listenSocketStats);
1✔
347
      future.channel().closeFuture().addListener(new ChannelFutureListener() {
1✔
348
        @Override
349
        public void operationComplete(ChannelFuture future) throws Exception {
350
          channelz.removeListenSocket(listenSocketStats);
1✔
351
        }
1✔
352
      });
353
    }
1✔
354
    listenSocketStatsList = Collections.unmodifiableList(socketStats);
1✔
355
  }
1✔
356

357
  @Override
358
  public void shutdown() {
359
    if (terminated) {
1✔
360
      return;
×
361
    }
362
    ChannelGroupFuture groupFuture = channelGroup.close()
1✔
363
        .addListener(new ChannelGroupFutureListener() {
1✔
364
            @Override
365
            public void operationComplete(ChannelGroupFuture future) throws Exception {
366
              if (!future.isSuccess()) {
1✔
367
                log.log(Level.WARNING, "Error closing server channel group", future.cause());
×
368
              }
369
              sharedResourceReferenceCounter.release();
1✔
370
              protocolNegotiator.close();
1✔
371
              listenSocketStatsList = Collections.emptyList();
1✔
372
              synchronized (NettyServer.this) {
1✔
373
                listener.serverShutdown();
1✔
374
                terminated = true;
1✔
375
              }
1✔
376
            }
1✔
377
        });
378
    try {
379
      groupFuture.await();
1✔
380
    } catch (InterruptedException e) {
×
381
      log.log(Level.FINE, "Interrupted while shutting down", e);
×
382
      Thread.currentThread().interrupt();
×
383
    }
1✔
384
  }
1✔
385

386
  @Override
387
  public InternalLogId getLogId() {
388
    return logId;
×
389
  }
390

391
  @Override
392
  public String toString() {
393
    return MoreObjects.toStringHelper(this)
×
394
        .add("logId", logId.getId())
×
395
        .add("addresses", addresses)
×
396
        .toString();
×
397
  }
398

399
  class SharedResourceReferenceCounter extends AbstractReferenceCounted {
1✔
400
    @Override
401
    protected void deallocate() {
402
      try {
403
        if (bossGroup != null) {
1✔
404
          bossGroupPool.returnObject(bossGroup);
1✔
405
        }
406
      } finally {
407
        bossGroup = null;
1✔
408
        try {
409
          if (workerGroup != null) {
1✔
410
            workerGroupPool.returnObject(workerGroup);
1✔
411
          }
412
        } finally {
413
          workerGroup = null;
1✔
414
        }
415
      }
416
    }
1✔
417

418
    @Override
419
    public ReferenceCounted touch(Object hint) {
420
      return this;
×
421
    }
422
  }
423

424
  /**
425
   * A class that can answer channelz queries about the server listen sockets.
426
   */
427
  private static final class ListenSocket implements InternalInstrumented<SocketStats> {
428
    private final InternalLogId id;
429
    private final Channel ch;
430

431
    ListenSocket(Channel ch) {
1✔
432
      this.ch = ch;
1✔
433
      this.id = InternalLogId.allocate(getClass(), String.valueOf(ch.localAddress()));
1✔
434
    }
1✔
435

436
    @Override
437
    public ListenableFuture<SocketStats> getStats() {
438
      final SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
439
      if (ch.eventLoop().inEventLoop()) {
1✔
440
        // This is necessary, otherwise we will block forever if we get the future from inside
441
        // the event loop.
442
        ret.set(new SocketStats(
×
443
            /*data=*/ null,
444
            ch.localAddress(),
×
445
            /*remote=*/ null,
446
            Utils.getSocketOptions(ch),
×
447
            /*security=*/ null));
448
        return ret;
×
449
      }
450
      ch.eventLoop()
1✔
451
          .submit(
1✔
452
              new Runnable() {
1✔
453
                @Override
454
                public void run() {
455
                  ret.set(new SocketStats(
1✔
456
                      /*data=*/ null,
457
                      ch.localAddress(),
1✔
458
                      /*remote=*/ null,
459
                      Utils.getSocketOptions(ch),
1✔
460
                      /*security=*/ null));
461
                }
1✔
462
              })
463
          .addListener(
1✔
464
              new GenericFutureListener<Future<Object>>() {
1✔
465
                @Override
466
                public void operationComplete(Future<Object> future) throws Exception {
467
                  if (!future.isSuccess()) {
1✔
468
                    ret.setException(future.cause());
×
469
                  }
470
                }
1✔
471
              });
472
      return ret;
1✔
473
    }
474

475
    @Override
476
    public InternalLogId getLogId() {
477
      return id;
1✔
478
    }
479

480
    @Override
481
    public String toString() {
482
      return MoreObjects.toStringHelper(this)
×
483
          .add("logId", id.getId())
×
484
          .add("channel", ch)
×
485
          .toString();
×
486
    }
487
  }
488
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc