• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18784

pending completion
#18784

push

github-actions

temawi
Bump version to 1.57.3-SNAPSHOT

30632 of 34707 relevant lines covered (88.26%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.68
/../netty/src/main/java/io/grpc/netty/NettyServer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
21
import static io.netty.channel.ChannelOption.ALLOCATOR;
22
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
23

24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.util.concurrent.ListenableFuture;
27
import com.google.common.util.concurrent.SettableFuture;
28
import io.grpc.Attributes;
29
import io.grpc.InternalChannelz;
30
import io.grpc.InternalChannelz.SocketStats;
31
import io.grpc.InternalInstrumented;
32
import io.grpc.InternalLogId;
33
import io.grpc.InternalWithLogId;
34
import io.grpc.ServerStreamTracer;
35
import io.grpc.internal.InternalServer;
36
import io.grpc.internal.ObjectPool;
37
import io.grpc.internal.ServerListener;
38
import io.grpc.internal.ServerTransportListener;
39
import io.grpc.internal.TransportTracer;
40
import io.netty.bootstrap.ServerBootstrap;
41
import io.netty.channel.Channel;
42
import io.netty.channel.ChannelFactory;
43
import io.netty.channel.ChannelFuture;
44
import io.netty.channel.ChannelFutureListener;
45
import io.netty.channel.ChannelInitializer;
46
import io.netty.channel.ChannelOption;
47
import io.netty.channel.ChannelPromise;
48
import io.netty.channel.EventLoop;
49
import io.netty.channel.EventLoopGroup;
50
import io.netty.channel.ServerChannel;
51
import io.netty.channel.group.ChannelGroup;
52
import io.netty.channel.group.ChannelGroupFuture;
53
import io.netty.channel.group.ChannelGroupFutureListener;
54
import io.netty.channel.group.DefaultChannelGroup;
55
import io.netty.util.AbstractReferenceCounted;
56
import io.netty.util.ReferenceCounted;
57
import io.netty.util.concurrent.Future;
58
import io.netty.util.concurrent.GenericFutureListener;
59
import java.io.IOException;
60
import java.net.SocketAddress;
61
import java.util.ArrayList;
62
import java.util.Collections;
63
import java.util.HashMap;
64
import java.util.Iterator;
65
import java.util.List;
66
import java.util.Map;
67
import java.util.concurrent.Callable;
68
import java.util.logging.Level;
69
import java.util.logging.Logger;
70

71
/**
72
 * Netty-based server implementation.
73
 */
74
class NettyServer implements InternalServer, InternalWithLogId {
75
  private static final Logger log = Logger.getLogger(InternalServer.class.getName());
1✔
76

77
  private final InternalLogId logId;
78
  private final List<? extends SocketAddress> addresses;
79
  private final ChannelFactory<? extends ServerChannel> channelFactory;
80
  private final Map<ChannelOption<?>, ?> channelOptions;
81
  private final Map<ChannelOption<?>, ?> childChannelOptions;
82
  private final ProtocolNegotiator protocolNegotiator;
83
  private final int maxStreamsPerConnection;
84
  private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
85
  private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
86
  private final boolean forceHeapBuffer;
87
  private EventLoopGroup bossGroup;
88
  private EventLoopGroup workerGroup;
89
  private ServerListener listener;
90
  private final ChannelGroup channelGroup;
91
  private final boolean autoFlowControl;
92
  private final int flowControlWindow;
93
  private final int maxMessageSize;
94
  private final int maxHeaderListSize;
95
  private final long keepAliveTimeInNanos;
96
  private final long keepAliveTimeoutInNanos;
97
  private final long maxConnectionIdleInNanos;
98
  private final long maxConnectionAgeInNanos;
99
  private final long maxConnectionAgeGraceInNanos;
100
  private final boolean permitKeepAliveWithoutCalls;
101
  private final long permitKeepAliveTimeInNanos;
102
  private final Attributes eagAttributes;
103
  private final ReferenceCounted sharedResourceReferenceCounter =
1✔
104
      new SharedResourceReferenceCounter();
105
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
106
  private final TransportTracer.Factory transportTracerFactory;
107
  private final InternalChannelz channelz;
108
  private volatile List<InternalInstrumented<SocketStats>> listenSocketStatsList =
1✔
109
      Collections.emptyList();
1✔
110
  private volatile boolean terminated;
111
  private final EventLoop bossExecutor;
112

113
  NettyServer(
114
      List<? extends SocketAddress> addresses,
115
      ChannelFactory<? extends ServerChannel> channelFactory,
116
      Map<ChannelOption<?>, ?> channelOptions,
117
      Map<ChannelOption<?>, ?> childChannelOptions,
118
      ObjectPool<? extends EventLoopGroup> bossGroupPool,
119
      ObjectPool<? extends EventLoopGroup> workerGroupPool,
120
      boolean forceHeapBuffer,
121
      ProtocolNegotiator protocolNegotiator,
122
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
123
      TransportTracer.Factory transportTracerFactory,
124
      int maxStreamsPerConnection, boolean autoFlowControl, int flowControlWindow,
125
      int maxMessageSize, int maxHeaderListSize,
126
      long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
127
      long maxConnectionIdleInNanos,
128
      long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
129
      boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
130
      Attributes eagAttributes, InternalChannelz channelz) {
1✔
131
    this.addresses = checkNotNull(addresses, "addresses");
1✔
132
    this.channelFactory = checkNotNull(channelFactory, "channelFactory");
1✔
133
    checkNotNull(channelOptions, "channelOptions");
1✔
134
    this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
1✔
135
    checkNotNull(childChannelOptions, "childChannelOptions");
1✔
136
    this.childChannelOptions = new HashMap<ChannelOption<?>, Object>(childChannelOptions);
1✔
137
    this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
1✔
138
    this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
1✔
139
    this.forceHeapBuffer = forceHeapBuffer;
1✔
140
    this.bossGroup = bossGroupPool.getObject();
1✔
141
    this.bossExecutor = bossGroup.next();
1✔
142
    this.channelGroup = new DefaultChannelGroup(this.bossExecutor);
1✔
143
    this.workerGroup = workerGroupPool.getObject();
1✔
144
    this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
1✔
145
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
146
    this.transportTracerFactory = transportTracerFactory;
1✔
147
    this.maxStreamsPerConnection = maxStreamsPerConnection;
1✔
148
    this.autoFlowControl = autoFlowControl;
1✔
149
    this.flowControlWindow = flowControlWindow;
1✔
150
    this.maxMessageSize = maxMessageSize;
1✔
151
    this.maxHeaderListSize = maxHeaderListSize;
1✔
152
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
153
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
154
    this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
1✔
155
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
156
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
157
    this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
1✔
158
    this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
1✔
159
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
160
    this.channelz = Preconditions.checkNotNull(channelz);
1✔
161
    this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" :
1✔
162
        String.valueOf(addresses));
1✔
163
  }
1✔
164

165
  @Override
166
  public SocketAddress getListenSocketAddress() {
167
    Iterator<Channel> it = channelGroup.iterator();
1✔
168
    if (it.hasNext()) {
1✔
169
      return it.next().localAddress();
1✔
170
    } else {
171
      // server is not listening/bound yet, just return the original port.
172
      return addresses.isEmpty() ? null : addresses.get(0);
1✔
173
    }
174
  }
175

176
  @Override
177
  public List<SocketAddress> getListenSocketAddresses() {
178
    List<SocketAddress> listenSocketAddresses = new ArrayList<>();
1✔
179
    for (Channel c: channelGroup) {
1✔
180
      listenSocketAddresses.add(c.localAddress());
1✔
181
    }
1✔
182
    // server is not listening/bound yet, just return the original ports.
183
    if (listenSocketAddresses.isEmpty())  {
1✔
184
      listenSocketAddresses.addAll(addresses);
1✔
185
    }
186
    return listenSocketAddresses;
1✔
187
  }
188

189
  @Override
190
  public InternalInstrumented<SocketStats> getListenSocketStats() {
191
    List<InternalInstrumented<SocketStats>> savedListenSocketStatsList = listenSocketStatsList;
1✔
192
    return savedListenSocketStatsList.isEmpty() ? null : savedListenSocketStatsList.get(0);
1✔
193
  }
194

195
  @Override
196
  public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
197
    return listenSocketStatsList;
1✔
198
  }
199

200
  @Override
201
  public void start(ServerListener serverListener) throws IOException {
202
    listener = checkNotNull(serverListener, "serverListener");
1✔
203

204
    final ServerBootstrap b = new ServerBootstrap();
1✔
205
    b.option(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
1✔
206
    b.childOption(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
1✔
207
    b.group(bossExecutor, workerGroup);
1✔
208
    b.channelFactory(channelFactory);
1✔
209
    // For non-socket based channel, the option will be ignored.
210
    b.childOption(SO_KEEPALIVE, true);
1✔
211

212
    if (channelOptions != null) {
1✔
213
      for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
1✔
214
        @SuppressWarnings("unchecked")
215
        ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
×
216
        b.option(key, entry.getValue());
×
217
      }
×
218
    }
219

220
    if (childChannelOptions != null) {
1✔
221
      for (Map.Entry<ChannelOption<?>, ?> entry : childChannelOptions.entrySet()) {
1✔
222
        @SuppressWarnings("unchecked")
223
        ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
1✔
224
        b.childOption(key, entry.getValue());
1✔
225
      }
1✔
226
    }
227

228
    b.childHandler(new ChannelInitializer<Channel>() {
1✔
229
      @Override
230
      public void initChannel(Channel ch) {
231

232
        ChannelPromise channelDone = ch.newPromise();
1✔
233

234
        long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos;
1✔
235
        if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
236
          // apply a random jitter of +/-10% to max connection age
237
          maxConnectionAgeInNanos =
1✔
238
              (long) ((.9D + Math.random() * .2D) * maxConnectionAgeInNanos);
1✔
239
        }
240

241
        NettyServerTransport transport =
1✔
242
            new NettyServerTransport(
243
                ch,
244
                channelDone,
245
                protocolNegotiator,
1✔
246
                streamTracerFactories,
1✔
247
                transportTracerFactory.create(),
1✔
248
                maxStreamsPerConnection,
1✔
249
                autoFlowControl,
1✔
250
                flowControlWindow,
1✔
251
                maxMessageSize,
1✔
252
                maxHeaderListSize,
1✔
253
                keepAliveTimeInNanos,
1✔
254
                keepAliveTimeoutInNanos,
1✔
255
                maxConnectionIdleInNanos,
1✔
256
                maxConnectionAgeInNanos,
257
                maxConnectionAgeGraceInNanos,
1✔
258
                permitKeepAliveWithoutCalls,
1✔
259
                permitKeepAliveTimeInNanos,
1✔
260
                eagAttributes);
1✔
261
        ServerTransportListener transportListener;
262
        // This is to order callbacks on the listener, not to guard access to channel.
263
        synchronized (NettyServer.this) {
1✔
264
          if (terminated) {
1✔
265
            // Server already terminated.
266
            ch.close();
×
267
            return;
×
268
          }
269
          // `channel` shutdown can race with `ch` initialization, so this is only safe to increment
270
          // inside the lock.
271
          sharedResourceReferenceCounter.retain();
1✔
272
          transportListener = listener.transportCreated(transport);
1✔
273
        }
1✔
274

275
        /**
276
         * Releases the event loop if the channel is "done", possibly due to the channel closing.
277
         */
278
        final class LoopReleaser implements ChannelFutureListener {
1✔
279
          private boolean done;
280

281
          @Override
282
          public void operationComplete(ChannelFuture future) throws Exception {
283
            if (!done) {
1✔
284
              done = true;
1✔
285
              sharedResourceReferenceCounter.release();
1✔
286
            }
287
          }
1✔
288
        }
289

290
        transport.start(transportListener);
1✔
291
        ChannelFutureListener loopReleaser = new LoopReleaser();
1✔
292
        channelDone.addListener(loopReleaser);
1✔
293
        ch.closeFuture().addListener(loopReleaser);
1✔
294
      }
1✔
295
    });
296
    Future<Map<ChannelFuture, SocketAddress>> bindCallFuture =
1✔
297
        bossExecutor.submit(
1✔
298
            new Callable<Map<ChannelFuture, SocketAddress>>() {
1✔
299
          @Override
300
          public Map<ChannelFuture, SocketAddress> call() {
301
            Map<ChannelFuture, SocketAddress> bindFutures = new HashMap<>();
1✔
302
            for (SocketAddress address: addresses) {
1✔
303
                ChannelFuture future = b.bind(address);
1✔
304
                channelGroup.add(future.channel());
1✔
305
                bindFutures.put(future, address);
1✔
306
            }
1✔
307
            return bindFutures;
1✔
308
          }
309
        }
310
    );
311
    Map<ChannelFuture, SocketAddress> channelFutures =
1✔
312
        bindCallFuture.awaitUninterruptibly().getNow();
1✔
313

314
    if (!bindCallFuture.isSuccess()) {
1✔
315
      channelGroup.close().awaitUninterruptibly();
1✔
316
      throw new IOException(String.format("Failed to bind to addresses %s",
1✔
317
          addresses), bindCallFuture.cause());
1✔
318
    }
319
    final List<InternalInstrumented<SocketStats>> socketStats = new ArrayList<>();
1✔
320
    for (Map.Entry<ChannelFuture, SocketAddress> entry: channelFutures.entrySet()) {
1✔
321
      // We'd love to observe interruption, but if interrupted we will need to close the channel,
322
      // which itself would need an await() to guarantee the port is not used when the method
323
      // returns. See #6850
324
      final ChannelFuture future = entry.getKey();
1✔
325
      if (!future.awaitUninterruptibly().isSuccess()) {
1✔
326
        channelGroup.close().awaitUninterruptibly();
1✔
327
        throw new IOException(String.format("Failed to bind to address %s",
1✔
328
            entry.getValue()), future.cause());
1✔
329
      }
330
      final InternalInstrumented<SocketStats> listenSocketStats =
1✔
331
          new ListenSocket(future.channel());
1✔
332
      channelz.addListenSocket(listenSocketStats);
1✔
333
      socketStats.add(listenSocketStats);
1✔
334
      future.channel().closeFuture().addListener(new ChannelFutureListener() {
1✔
335
        @Override
336
        public void operationComplete(ChannelFuture future) throws Exception {
337
          channelz.removeListenSocket(listenSocketStats);
1✔
338
        }
1✔
339
      });
340
    }
1✔
341
    listenSocketStatsList = Collections.unmodifiableList(socketStats);
1✔
342
  }
1✔
343

344
  @Override
345
  public void shutdown() {
346
    if (terminated) {
1✔
347
      return;
×
348
    }
349
    ChannelGroupFuture groupFuture = channelGroup.close()
1✔
350
        .addListener(new ChannelGroupFutureListener() {
1✔
351
            @Override
352
            public void operationComplete(ChannelGroupFuture future) throws Exception {
353
              if (!future.isSuccess()) {
1✔
354
                log.log(Level.WARNING, "Error closing server channel group", future.cause());
×
355
              }
356
              sharedResourceReferenceCounter.release();
1✔
357
              protocolNegotiator.close();
1✔
358
              listenSocketStatsList = Collections.emptyList();
1✔
359
              synchronized (NettyServer.this) {
1✔
360
                listener.serverShutdown();
1✔
361
                terminated = true;
1✔
362
              }
1✔
363
            }
1✔
364
        });
365
    try {
366
      groupFuture.await();
1✔
367
    } catch (InterruptedException e) {
×
368
      log.log(Level.FINE, "Interrupted while shutting down", e);
×
369
      Thread.currentThread().interrupt();
×
370
    }
1✔
371
  }
1✔
372

373
  @Override
374
  public InternalLogId getLogId() {
375
    return logId;
×
376
  }
377

378
  @Override
379
  public String toString() {
380
    return MoreObjects.toStringHelper(this)
×
381
        .add("logId", logId.getId())
×
382
        .add("addresses", addresses)
×
383
        .toString();
×
384
  }
385

386
  class SharedResourceReferenceCounter extends AbstractReferenceCounted {
1✔
387
    @Override
388
    protected void deallocate() {
389
      try {
390
        if (bossGroup != null) {
1✔
391
          bossGroupPool.returnObject(bossGroup);
1✔
392
        }
393
      } finally {
394
        bossGroup = null;
1✔
395
        try {
396
          if (workerGroup != null) {
1✔
397
            workerGroupPool.returnObject(workerGroup);
1✔
398
          }
399
        } finally {
400
          workerGroup = null;
1✔
401
        }
402
      }
403
    }
1✔
404

405
    @Override
406
    public ReferenceCounted touch(Object hint) {
407
      return this;
×
408
    }
409
  }
410

411
  /**
412
   * A class that can answer channelz queries about the server listen sockets.
413
   */
414
  private static final class ListenSocket implements InternalInstrumented<SocketStats> {
415
    private final InternalLogId id;
416
    private final Channel ch;
417

418
    ListenSocket(Channel ch) {
1✔
419
      this.ch = ch;
1✔
420
      this.id = InternalLogId.allocate(getClass(), String.valueOf(ch.localAddress()));
1✔
421
    }
1✔
422

423
    @Override
424
    public ListenableFuture<SocketStats> getStats() {
425
      final SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
426
      if (ch.eventLoop().inEventLoop()) {
1✔
427
        // This is necessary, otherwise we will block forever if we get the future from inside
428
        // the event loop.
429
        ret.set(new SocketStats(
×
430
            /*data=*/ null,
431
            ch.localAddress(),
×
432
            /*remote=*/ null,
433
            Utils.getSocketOptions(ch),
×
434
            /*security=*/ null));
435
        return ret;
×
436
      }
437
      ch.eventLoop()
1✔
438
          .submit(
1✔
439
              new Runnable() {
1✔
440
                @Override
441
                public void run() {
442
                  ret.set(new SocketStats(
1✔
443
                      /*data=*/ null,
444
                      ch.localAddress(),
1✔
445
                      /*remote=*/ null,
446
                      Utils.getSocketOptions(ch),
1✔
447
                      /*security=*/ null));
448
                }
1✔
449
              })
450
          .addListener(
1✔
451
              new GenericFutureListener<Future<Object>>() {
1✔
452
                @Override
453
                public void operationComplete(Future<Object> future) throws Exception {
454
                  if (!future.isSuccess()) {
1✔
455
                    ret.setException(future.cause());
×
456
                  }
457
                }
1✔
458
              });
459
      return ret;
1✔
460
    }
461

462
    @Override
463
    public InternalLogId getLogId() {
464
      return id;
1✔
465
    }
466

467
    @Override
468
    public String toString() {
469
      return MoreObjects.toStringHelper(this)
×
470
          .add("logId", id.getId())
×
471
          .add("channel", ch)
×
472
          .toString();
×
473
    }
474
  }
475
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc