• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20230

31 Mar 2026 09:55AM UTC coverage: 88.734% (+0.01%) from 88.72%
#20230

push

github

web-flow
openTelemetry: add tcp metrics (#12652)

Implements [A80](https://github.com/grpc/proposal/pull/519)

35697 of 40229 relevant lines covered (88.73%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.1
/../netty/src/main/java/io/grpc/netty/NettyServer.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
21
import static io.netty.channel.ChannelOption.ALLOCATOR;
22
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
23

24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.util.concurrent.ListenableFuture;
27
import com.google.common.util.concurrent.SettableFuture;
28
import io.grpc.Attributes;
29
import io.grpc.InternalChannelz;
30
import io.grpc.InternalChannelz.SocketStats;
31
import io.grpc.InternalInstrumented;
32
import io.grpc.InternalLogId;
33
import io.grpc.InternalWithLogId;
34
import io.grpc.MetricRecorder;
35
import io.grpc.ServerStreamTracer;
36
import io.grpc.internal.InternalServer;
37
import io.grpc.internal.ObjectPool;
38
import io.grpc.internal.ServerListener;
39
import io.grpc.internal.ServerTransportListener;
40
import io.grpc.internal.TransportTracer;
41
import io.netty.bootstrap.ServerBootstrap;
42
import io.netty.channel.Channel;
43
import io.netty.channel.ChannelFactory;
44
import io.netty.channel.ChannelFuture;
45
import io.netty.channel.ChannelFutureListener;
46
import io.netty.channel.ChannelInitializer;
47
import io.netty.channel.ChannelOption;
48
import io.netty.channel.ChannelPromise;
49
import io.netty.channel.EventLoop;
50
import io.netty.channel.EventLoopGroup;
51
import io.netty.channel.ServerChannel;
52
import io.netty.channel.group.ChannelGroup;
53
import io.netty.channel.group.ChannelGroupFuture;
54
import io.netty.channel.group.ChannelGroupFutureListener;
55
import io.netty.channel.group.DefaultChannelGroup;
56
import io.netty.util.AbstractReferenceCounted;
57
import io.netty.util.ReferenceCounted;
58
import io.netty.util.concurrent.Future;
59
import io.netty.util.concurrent.GenericFutureListener;
60
import java.io.IOException;
61
import java.net.SocketAddress;
62
import java.util.ArrayList;
63
import java.util.Collections;
64
import java.util.HashMap;
65
import java.util.Iterator;
66
import java.util.List;
67
import java.util.Map;
68
import java.util.concurrent.Callable;
69
import java.util.logging.Level;
70
import java.util.logging.Logger;
71

72
/**
73
 * Netty-based server implementation.
74
 */
75
class NettyServer implements InternalServer, InternalWithLogId {
76
  private static final Logger log = Logger.getLogger(InternalServer.class.getName());
1✔
77

78
  private final InternalLogId logId;
79
  private final List<? extends SocketAddress> addresses;
80
  private final ChannelFactory<? extends ServerChannel> channelFactory;
81
  private final Map<ChannelOption<?>, ?> channelOptions;
82
  private final Map<ChannelOption<?>, ?> childChannelOptions;
83
  private final ProtocolNegotiator protocolNegotiator;
84
  private final int maxStreamsPerConnection;
85
  private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
86
  private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
87
  private final boolean forceHeapBuffer;
88
  private EventLoopGroup bossGroup;
89
  private EventLoopGroup workerGroup;
90
  private ServerListener listener;
91
  private final ChannelGroup channelGroup;
92
  private final boolean autoFlowControl;
93
  private final int flowControlWindow;
94
  private final int maxMessageSize;
95
  private final int maxHeaderListSize;
96
  private final int softLimitHeaderListSize;
97
  private MetricRecorder metricRecorder;
98
  private final long keepAliveTimeInNanos;
99
  private final long keepAliveTimeoutInNanos;
100
  private final long maxConnectionIdleInNanos;
101
  private final long maxConnectionAgeInNanos;
102
  private final long maxConnectionAgeGraceInNanos;
103
  private final boolean permitKeepAliveWithoutCalls;
104
  private final long permitKeepAliveTimeInNanos;
105
  private final int maxRstCount;
106
  private final long maxRstPeriodNanos;
107
  private final Attributes eagAttributes;
108
  private final ReferenceCounted sharedResourceReferenceCounter =
1✔
109
      new SharedResourceReferenceCounter();
110
  private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
111
  private final TransportTracer.Factory transportTracerFactory;
112
  private final InternalChannelz channelz;
113
  private volatile List<InternalInstrumented<SocketStats>> listenSocketStatsList =
1✔
114
      Collections.emptyList();
1✔
115
  private volatile boolean terminated;
116
  private final EventLoop bossExecutor;
117

118
  NettyServer(
119
      List<? extends SocketAddress> addresses,
120
      ChannelFactory<? extends ServerChannel> channelFactory,
121
      Map<ChannelOption<?>, ?> channelOptions,
122
      Map<ChannelOption<?>, ?> childChannelOptions,
123
      ObjectPool<? extends EventLoopGroup> bossGroupPool,
124
      ObjectPool<? extends EventLoopGroup> workerGroupPool,
125
      boolean forceHeapBuffer,
126
      ProtocolNegotiator protocolNegotiator,
127
      List<? extends ServerStreamTracer.Factory> streamTracerFactories,
128
      TransportTracer.Factory transportTracerFactory,
129
      int maxStreamsPerConnection,
130
      boolean autoFlowControl,
131
      int flowControlWindow,
132
      int maxMessageSize,
133
      int maxHeaderListSize,
134
      int softLimitHeaderListSize,
135
      long keepAliveTimeInNanos,
136
      long keepAliveTimeoutInNanos,
137
      long maxConnectionIdleInNanos,
138
      long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
139
      boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
140
      int maxRstCount, long maxRstPeriodNanos,
141
      Attributes eagAttributes, InternalChannelz channelz,
142
      MetricRecorder metricRecorder) {
1✔
143
    this.addresses = checkNotNull(addresses, "addresses");
1✔
144
    this.metricRecorder = metricRecorder;
1✔
145
    this.channelFactory = checkNotNull(channelFactory, "channelFactory");
1✔
146
    checkNotNull(channelOptions, "channelOptions");
1✔
147
    this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
1✔
148
    checkNotNull(childChannelOptions, "childChannelOptions");
1✔
149
    this.childChannelOptions = new HashMap<ChannelOption<?>, Object>(childChannelOptions);
1✔
150
    this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
1✔
151
    this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
1✔
152
    this.forceHeapBuffer = forceHeapBuffer;
1✔
153
    this.bossGroup = bossGroupPool.getObject();
1✔
154
    this.bossExecutor = bossGroup.next();
1✔
155
    this.channelGroup = new DefaultChannelGroup(this.bossExecutor);
1✔
156
    this.workerGroup = workerGroupPool.getObject();
1✔
157
    this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
1✔
158
    this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
1✔
159
    this.transportTracerFactory = transportTracerFactory;
1✔
160
    this.maxStreamsPerConnection = maxStreamsPerConnection;
1✔
161
    this.autoFlowControl = autoFlowControl;
1✔
162
    this.flowControlWindow = flowControlWindow;
1✔
163
    this.maxMessageSize = maxMessageSize;
1✔
164
    this.maxHeaderListSize = maxHeaderListSize;
1✔
165
    this.softLimitHeaderListSize = softLimitHeaderListSize;
1✔
166
    this.keepAliveTimeInNanos = keepAliveTimeInNanos;
1✔
167
    this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
1✔
168
    this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
1✔
169
    this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
1✔
170
    this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
1✔
171
    this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
1✔
172
    this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
1✔
173
    this.maxRstCount = maxRstCount;
1✔
174
    this.maxRstPeriodNanos = maxRstPeriodNanos;
1✔
175
    this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
1✔
176
    this.channelz = Preconditions.checkNotNull(channelz);
1✔
177
    this.logId = InternalLogId.allocate(getClass(), addresses.isEmpty() ? "No address" :
1✔
178
        String.valueOf(addresses));
1✔
179
  }
1✔
180

181

182
  @Override
183
  public SocketAddress getListenSocketAddress() {
184
    Iterator<Channel> it = channelGroup.iterator();
1✔
185
    if (it.hasNext()) {
1✔
186
      return it.next().localAddress();
1✔
187
    } else {
188
      // server is not listening/bound yet, just return the original port.
189
      return addresses.isEmpty() ? null : addresses.get(0);
1✔
190
    }
191
  }
192

193
  @Override
194
  public List<SocketAddress> getListenSocketAddresses() {
195
    List<SocketAddress> listenSocketAddresses = new ArrayList<>();
1✔
196
    for (Channel c: channelGroup) {
1✔
197
      listenSocketAddresses.add(c.localAddress());
1✔
198
    }
1✔
199
    // server is not listening/bound yet, just return the original ports.
200
    if (listenSocketAddresses.isEmpty())  {
1✔
201
      listenSocketAddresses.addAll(addresses);
1✔
202
    }
203
    return listenSocketAddresses;
1✔
204
  }
205

206
  @Override
207
  public InternalInstrumented<SocketStats> getListenSocketStats() {
208
    List<InternalInstrumented<SocketStats>> savedListenSocketStatsList = listenSocketStatsList;
1✔
209
    return savedListenSocketStatsList.isEmpty() ? null : savedListenSocketStatsList.get(0);
1✔
210
  }
211

212
  @Override
213
  public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
214
    return listenSocketStatsList;
1✔
215
  }
216

217
  @Override
218
  public void start(ServerListener serverListener) throws IOException {
219
    listener = checkNotNull(serverListener, "serverListener");
1✔
220

221
    final ServerBootstrap b = new ServerBootstrap();
1✔
222
    b.option(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
1✔
223
    b.childOption(ALLOCATOR, Utils.getByteBufAllocator(forceHeapBuffer));
1✔
224
    b.group(bossExecutor, workerGroup);
1✔
225
    b.channelFactory(channelFactory);
1✔
226
    // For non-socket based channel, the option will be ignored.
227
    b.childOption(SO_KEEPALIVE, true);
1✔
228

229
    if (channelOptions != null) {
1✔
230
      for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
1✔
231
        @SuppressWarnings("unchecked")
232
        ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
×
233
        b.option(key, entry.getValue());
×
234
      }
×
235
    }
236

237
    if (childChannelOptions != null) {
1✔
238
      for (Map.Entry<ChannelOption<?>, ?> entry : childChannelOptions.entrySet()) {
1✔
239
        @SuppressWarnings("unchecked")
240
        ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
1✔
241
        b.childOption(key, entry.getValue());
1✔
242
      }
1✔
243
    }
244

245
    b.childHandler(new ChannelInitializer<Channel>() {
1✔
246
      @Override
247
      public void initChannel(Channel ch) {
248

249
        ChannelPromise channelDone = ch.newPromise();
1✔
250

251
        long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos;
1✔
252
        if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
253
          // apply a random jitter of +/-10% to max connection age
254
          maxConnectionAgeInNanos =
1✔
255
              (long) ((.9D + Math.random() * .2D) * maxConnectionAgeInNanos);
1✔
256
        }
257

258
            NettyServerTransport transport =
1✔
259
                new NettyServerTransport(
260
                    ch,
261
                    channelDone,
262
                    protocolNegotiator,
1✔
263
                    streamTracerFactories,
1✔
264
                    transportTracerFactory.create(),
1✔
265
                    maxStreamsPerConnection,
1✔
266
                    autoFlowControl,
1✔
267
                    flowControlWindow,
1✔
268
                    maxMessageSize,
1✔
269
                    maxHeaderListSize,
1✔
270
                    softLimitHeaderListSize,
1✔
271
                    keepAliveTimeInNanos,
1✔
272
                    keepAliveTimeoutInNanos,
1✔
273
                    maxConnectionIdleInNanos,
1✔
274
                    maxConnectionAgeInNanos,
275
                    maxConnectionAgeGraceInNanos,
1✔
276
                    permitKeepAliveWithoutCalls,
1✔
277
                    permitKeepAliveTimeInNanos,
1✔
278
                    maxRstCount,
1✔
279
                    maxRstPeriodNanos,
1✔
280
                    eagAttributes,
1✔
281
                    metricRecorder);
1✔
282
        ServerTransportListener transportListener;
283
        // This is to order callbacks on the listener, not to guard access to channel.
284
        synchronized (NettyServer.this) {
1✔
285
          if (terminated) {
1✔
286
            // Server already terminated.
287
            ch.close();
1✔
288
            return;
1✔
289
          }
290
          // `channel` shutdown can race with `ch` initialization, so this is only safe to increment
291
          // inside the lock.
292
          sharedResourceReferenceCounter.retain();
1✔
293
          transportListener = listener.transportCreated(transport);
1✔
294
        }
1✔
295

296
        /* Releases the event loop if the channel is "done", possibly due to the channel closing. */
297
        final class LoopReleaser implements ChannelFutureListener {
1✔
298
          private boolean done;
299

300
          @Override
301
          public void operationComplete(ChannelFuture future) throws Exception {
302
            if (!done) {
1✔
303
              done = true;
1✔
304
              sharedResourceReferenceCounter.release();
1✔
305
            }
306
          }
1✔
307
        }
308

309
        transport.start(transportListener);
1✔
310
        ChannelFutureListener loopReleaser = new LoopReleaser();
1✔
311
        channelDone.addListener(loopReleaser);
1✔
312
        ch.closeFuture().addListener(loopReleaser);
1✔
313
      }
1✔
314
    });
315
    Future<Map<ChannelFuture, SocketAddress>> bindCallFuture =
1✔
316
        bossExecutor.submit(
1✔
317
            new Callable<Map<ChannelFuture, SocketAddress>>() {
1✔
318
          @Override
319
          public Map<ChannelFuture, SocketAddress> call() {
320
            Map<ChannelFuture, SocketAddress> bindFutures = new HashMap<>();
1✔
321
            for (SocketAddress address: addresses) {
1✔
322
                ChannelFuture future = b.bind(address);
1✔
323
                channelGroup.add(future.channel());
1✔
324
                bindFutures.put(future, address);
1✔
325
            }
1✔
326
            return bindFutures;
1✔
327
          }
328
        }
329
    );
330
    Map<ChannelFuture, SocketAddress> channelFutures =
1✔
331
        bindCallFuture.awaitUninterruptibly().getNow();
1✔
332

333
    if (!bindCallFuture.isSuccess()) {
1✔
334
      channelGroup.close().awaitUninterruptibly();
1✔
335
      throw new IOException(String.format("Failed to bind to addresses %s",
1✔
336
          addresses), bindCallFuture.cause());
1✔
337
    }
338
    final List<InternalInstrumented<SocketStats>> socketStats = new ArrayList<>();
1✔
339
    for (Map.Entry<ChannelFuture, SocketAddress> entry: channelFutures.entrySet()) {
1✔
340
      // We'd love to observe interruption, but if interrupted we will need to close the channel,
341
      // which itself would need an await() to guarantee the port is not used when the method
342
      // returns. See #6850
343
      final ChannelFuture future = entry.getKey();
1✔
344
      if (!future.awaitUninterruptibly().isSuccess()) {
1✔
345
        channelGroup.close().awaitUninterruptibly();
1✔
346
        throw new IOException(String.format("Failed to bind to address %s",
1✔
347
            entry.getValue()), future.cause());
1✔
348
      }
349
      final InternalInstrumented<SocketStats> listenSocketStats =
1✔
350
          new ListenSocket(future.channel());
1✔
351
      channelz.addListenSocket(listenSocketStats);
1✔
352
      socketStats.add(listenSocketStats);
1✔
353
      future.channel().closeFuture().addListener(new ChannelFutureListener() {
1✔
354
        @Override
355
        public void operationComplete(ChannelFuture future) throws Exception {
356
          channelz.removeListenSocket(listenSocketStats);
1✔
357
        }
1✔
358
      });
359
    }
1✔
360
    listenSocketStatsList = Collections.unmodifiableList(socketStats);
1✔
361
  }
1✔
362

363
  @Override
364
  public void shutdown() {
365
    if (terminated) {
1✔
366
      return;
×
367
    }
368
    ChannelGroupFuture groupFuture = channelGroup.close()
1✔
369
        .addListener(new ChannelGroupFutureListener() {
1✔
370
            @Override
371
            public void operationComplete(ChannelGroupFuture future) throws Exception {
372
              if (!future.isSuccess()) {
1✔
373
                log.log(Level.WARNING, "Error closing server channel group", future.cause());
×
374
              }
375
              sharedResourceReferenceCounter.release();
1✔
376
              protocolNegotiator.close();
1✔
377
              listenSocketStatsList = Collections.emptyList();
1✔
378
              synchronized (NettyServer.this) {
1✔
379
                listener.serverShutdown();
1✔
380
                terminated = true;
1✔
381
              }
1✔
382
            }
1✔
383
        });
384
    try {
385
      groupFuture.await();
1✔
386
    } catch (InterruptedException e) {
×
387
      log.log(Level.FINE, "Interrupted while shutting down", e);
×
388
      Thread.currentThread().interrupt();
×
389
    }
1✔
390
  }
1✔
391

392
  @Override
393
  public InternalLogId getLogId() {
394
    return logId;
×
395
  }
396

397
  @Override
398
  public String toString() {
399
    return MoreObjects.toStringHelper(this)
×
400
        .add("logId", logId.getId())
×
401
        .add("addresses", addresses)
×
402
        .toString();
×
403
  }
404

405
  class SharedResourceReferenceCounter extends AbstractReferenceCounted {
1✔
406
    @Override
407
    protected void deallocate() {
408
      try {
409
        if (bossGroup != null) {
1✔
410
          bossGroupPool.returnObject(bossGroup);
1✔
411
        }
412
      } finally {
413
        bossGroup = null;
1✔
414
        try {
415
          if (workerGroup != null) {
1✔
416
            workerGroupPool.returnObject(workerGroup);
1✔
417
          }
418
        } finally {
419
          workerGroup = null;
1✔
420
        }
421
      }
422
    }
1✔
423

424
    @Override
425
    public ReferenceCounted touch(Object hint) {
426
      return this;
×
427
    }
428
  }
429

430
  /**
431
   * A class that can answer channelz queries about the server listen sockets.
432
   */
433
  private static final class ListenSocket implements InternalInstrumented<SocketStats> {
434
    private final InternalLogId id;
435
    private final Channel ch;
436

437
    ListenSocket(Channel ch) {
1✔
438
      this.ch = ch;
1✔
439
      this.id = InternalLogId.allocate(getClass(), String.valueOf(ch.localAddress()));
1✔
440
    }
1✔
441

442
    @Override
443
    public ListenableFuture<SocketStats> getStats() {
444
      final SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
445
      if (ch.eventLoop().inEventLoop()) {
1✔
446
        // This is necessary, otherwise we will block forever if we get the future from inside
447
        // the event loop.
448
        ret.set(new SocketStats(
×
449
            /*data=*/ null,
450
            ch.localAddress(),
×
451
            /*remote=*/ null,
452
            Utils.getSocketOptions(ch),
×
453
            /*security=*/ null));
454
        return ret;
×
455
      }
456
      ch.eventLoop()
1✔
457
          .submit(
1✔
458
              new Runnable() {
1✔
459
                @Override
460
                public void run() {
461
                  ret.set(new SocketStats(
1✔
462
                      /*data=*/ null,
463
                      ch.localAddress(),
1✔
464
                      /*remote=*/ null,
465
                      Utils.getSocketOptions(ch),
1✔
466
                      /*security=*/ null));
467
                }
1✔
468
              })
469
          .addListener(
1✔
470
              new GenericFutureListener<Future<Object>>() {
1✔
471
                @Override
472
                public void operationComplete(Future<Object> future) throws Exception {
473
                  if (!future.isSuccess()) {
1✔
474
                    ret.setException(future.cause());
×
475
                  }
476
                }
1✔
477
              });
478
      return ret;
1✔
479
    }
480

481
    @Override
482
    public InternalLogId getLogId() {
483
      return id;
1✔
484
    }
485

486
    @Override
487
    public String toString() {
488
      return MoreObjects.toStringHelper(this)
×
489
          .add("logId", id.getId())
×
490
          .add("channel", ch)
×
491
          .toString();
×
492
    }
493
  }
494
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc