• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20230

31 Mar 2026 09:55AM UTC coverage: 88.734% (+0.01%) from 88.72%
#20230

push

github

web-flow
openTelemetry: add tcp metrics (#12652)

Implements [A80](https://github.com/grpc/proposal/pull/519)

35697 of 40229 relevant lines covered (88.73%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.81
/../netty/src/main/java/io/grpc/netty/NettyClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
20
import static io.netty.channel.ChannelOption.ALLOCATOR;
21
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.ListenableFuture;
28
import com.google.common.util.concurrent.SettableFuture;
29
import io.grpc.Attributes;
30
import io.grpc.CallOptions;
31
import io.grpc.ChannelLogger;
32
import io.grpc.ClientStreamTracer;
33
import io.grpc.InternalChannelz.SocketStats;
34
import io.grpc.InternalLogId;
35
import io.grpc.Metadata;
36
import io.grpc.MethodDescriptor;
37
import io.grpc.MetricRecorder;
38
import io.grpc.Status;
39
import io.grpc.internal.ClientStream;
40
import io.grpc.internal.ConnectionClientTransport;
41
import io.grpc.internal.DisconnectError;
42
import io.grpc.internal.FailingClientStream;
43
import io.grpc.internal.GrpcUtil;
44
import io.grpc.internal.Http2Ping;
45
import io.grpc.internal.KeepAliveManager;
46
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
47
import io.grpc.internal.SimpleDisconnectError;
48
import io.grpc.internal.StatsTraceContext;
49
import io.grpc.internal.TransportTracer;
50
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
51
import io.netty.bootstrap.Bootstrap;
52
import io.netty.channel.Channel;
53
import io.netty.channel.ChannelFactory;
54
import io.netty.channel.ChannelFuture;
55
import io.netty.channel.ChannelFutureListener;
56
import io.netty.channel.ChannelHandler;
57
import io.netty.channel.ChannelOption;
58
import io.netty.channel.EventLoop;
59
import io.netty.channel.EventLoopGroup;
60
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
61
import io.netty.util.AsciiString;
62
import io.netty.util.concurrent.Future;
63
import io.netty.util.concurrent.GenericFutureListener;
64
import java.net.SocketAddress;
65
import java.nio.channels.ClosedChannelException;
66
import java.util.Map;
67
import java.util.concurrent.Executor;
68
import java.util.concurrent.TimeUnit;
69
import javax.annotation.Nullable;
70

71
/**
72
 * A Netty-based {@link ConnectionClientTransport} implementation.
73
 */
74
class NettyClientTransport implements ConnectionClientTransport,
1✔
75
    ClientKeepAlivePinger.TransportWithDisconnectReason {
76

77
  private final InternalLogId logId;
78
  private final Map<ChannelOption<?>, ?> channelOptions;
79
  private final SocketAddress remoteAddress;
80
  private final ChannelFactory<? extends Channel> channelFactory;
81
  private final EventLoopGroup group;
82
  private final ProtocolNegotiator negotiator;
83
  private final String authorityString;
84
  private final AsciiString authority;
85
  private final AsciiString userAgent;
86
  private final boolean autoFlowControl;
87
  private final int flowControlWindow;
88
  private final int maxMessageSize;
89
  private final int maxHeaderListSize;
90
  private final int softLimitHeaderListSize;
91
  private KeepAliveManager keepAliveManager;
92
  private final long keepAliveTimeNanos;
93
  private final long keepAliveTimeoutNanos;
94
  private final boolean keepAliveWithoutCalls;
95
  private final AsciiString negotiationScheme;
96
  private final Runnable tooManyPingsRunnable;
97
  private NettyClientHandler handler;
98
  // We should not send on the channel until negotiation completes. This is a hard requirement
99
  // by SslHandler but is appropriate for HTTP/1.1 Upgrade as well.
100
  private Channel channel;
101
  /** If {@link #start} has been called, non-{@code null} if channel is {@code null}. */
102
  private Status statusExplainingWhyTheChannelIsNull;
103
  /** Since not thread-safe, may only be used from event loop. */
104
  private ClientTransportLifecycleManager lifecycleManager;
105
  /** Since not thread-safe, may only be used from event loop. */
106
  private final TransportTracer transportTracer;
107
  private final Attributes eagAttributes;
108
  private final LocalSocketPicker localSocketPicker;
109
  private final ChannelLogger channelLogger;
110
  private final boolean useGetForSafeMethods;
111
  private final Ticker ticker;
112
  private final MetricRecorder metricRecorder;
113

114

115
  NettyClientTransport(
116
      SocketAddress address,
117
      ChannelFactory<? extends Channel> channelFactory,
118
      Map<ChannelOption<?>, ?> channelOptions,
119
      EventLoopGroup group,
120
      ProtocolNegotiator negotiator,
121
      boolean autoFlowControl,
122
      int flowControlWindow,
123
      int maxMessageSize,
124
      int maxHeaderListSize,
125
      int softLimitHeaderListSize,
126
      long keepAliveTimeNanos,
127
      long keepAliveTimeoutNanos,
128
      boolean keepAliveWithoutCalls,
129
      String authority,
130
      @Nullable String userAgent,
131
      Runnable tooManyPingsRunnable,
132
      TransportTracer transportTracer,
133
      Attributes eagAttributes,
134
      LocalSocketPicker localSocketPicker,
135
      ChannelLogger channelLogger,
136
      boolean useGetForSafeMethods,
137
      MetricRecorder metricRecorder,
138
      Ticker ticker) {
1✔
139

140
    this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
1✔
141
    this.negotiationScheme = this.negotiator.scheme();
1✔
142
    this.remoteAddress = Preconditions.checkNotNull(address, "address");
1✔
143
    this.group = Preconditions.checkNotNull(group, "group");
1✔
144
    this.channelFactory = channelFactory;
1✔
145
    this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
1✔
146
    this.autoFlowControl = autoFlowControl;
1✔
147
    this.flowControlWindow = flowControlWindow;
1✔
148
    this.maxMessageSize = maxMessageSize;
1✔
149
    this.maxHeaderListSize = maxHeaderListSize;
1✔
150
    this.softLimitHeaderListSize = softLimitHeaderListSize;
1✔
151
    this.keepAliveTimeNanos = keepAliveTimeNanos;
1✔
152
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
1✔
153
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
1✔
154
    this.authorityString = authority;
1✔
155
    this.authority = new AsciiString(authority);
1✔
156
    this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent));
1✔
157
    this.tooManyPingsRunnable =
1✔
158
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
159
    this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
1✔
160
    this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
161
    this.localSocketPicker = Preconditions.checkNotNull(localSocketPicker, "localSocketPicker");
1✔
162
    this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
1✔
163
    this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
1✔
164
    this.useGetForSafeMethods = useGetForSafeMethods;
1✔
165
    this.metricRecorder = metricRecorder;
1✔
166
    this.ticker = Preconditions.checkNotNull(ticker, "ticker");
1✔
167
  }
1✔
168

169
  @Override
170
  public void ping(final PingCallback callback, final Executor executor) {
171
    if (channel == null) {
1✔
172
      executor.execute(new Runnable() {
1✔
173
        @Override
174
        public void run() {
175
          callback.onFailure(statusExplainingWhyTheChannelIsNull);
1✔
176
        }
1✔
177
      });
178
      return;
1✔
179
    }
180
    // The promise and listener always succeed in NettyClientHandler. So this listener handles the
181
    // error case, when the channel is closed and the NettyClientHandler no longer in the pipeline.
182
    ChannelFutureListener failureListener = new ChannelFutureListener() {
1✔
183
      @Override
184
      public void operationComplete(ChannelFuture future) throws Exception {
185
        if (!future.isSuccess()) {
1✔
186
          Status s = statusFromFailedFuture(future);
1✔
187
          Http2Ping.notifyFailed(callback, executor, s);
1✔
188
        }
189
      }
1✔
190
    };
191
    // Write the command requesting the ping
192
    handler.getWriteQueue().enqueue(new SendPingCommand(callback, executor), true)
1✔
193
        .addListener(failureListener);
1✔
194
  }
1✔
195

196
  @Override
197
  public ClientStream newStream(
198
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
199
      ClientStreamTracer[] tracers) {
200
    Preconditions.checkNotNull(method, "method");
1✔
201
    Preconditions.checkNotNull(headers, "headers");
1✔
202
    if (channel == null) {
1✔
203
      return new FailingClientStream(statusExplainingWhyTheChannelIsNull, tracers);
1✔
204
    }
205
    StatsTraceContext statsTraceCtx =
1✔
206
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
207
    return new NettyClientStream(
1✔
208
        new NettyClientStream.TransportState(
209
            handler,
210
            channel.eventLoop(),
1✔
211
            maxMessageSize,
212
            statsTraceCtx,
213
            transportTracer,
214
            method.getFullMethodName(),
1✔
215
            callOptions) {
1✔
216
          @Override
217
          protected Status statusFromFailedFuture(ChannelFuture f) {
218
            return NettyClientTransport.this.statusFromFailedFuture(f);
1✔
219
          }
220
        },
221
        method,
222
        headers,
223
        channel,
224
        authority,
225
        negotiationScheme,
226
        userAgent,
227
        statsTraceCtx,
228
        transportTracer,
229
        callOptions,
230
        useGetForSafeMethods);
231
  }
232

233
  @SuppressWarnings("unchecked")
234
  @Override
235
  public Runnable start(Listener transportListener) {
236
    lifecycleManager = new ClientTransportLifecycleManager(
1✔
237
        Preconditions.checkNotNull(transportListener, "listener"));
1✔
238
    EventLoop eventLoop = group.next();
1✔
239
    if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
240
      keepAliveManager = new KeepAliveManager(
1✔
241
          new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos,
242
          keepAliveTimeoutNanos, keepAliveWithoutCalls);
243
    }
244

245
    handler = NettyClientHandler.newHandler(
1✔
246
            lifecycleManager,
247
            keepAliveManager,
248
            autoFlowControl,
249
            flowControlWindow,
250
            maxHeaderListSize,
251
            softLimitHeaderListSize,
252
            GrpcUtil.STOPWATCH_SUPPLIER,
253
            tooManyPingsRunnable,
254
            transportTracer,
255
            eagAttributes,
256
            authorityString,
257
            channelLogger,
258
            ticker,
259
            metricRecorder);
260

261
    ChannelHandler negotiationHandler = negotiator.newHandler(handler);
1✔
262

263
    Bootstrap b = new Bootstrap();
1✔
264
    b.option(ALLOCATOR, Utils.getByteBufAllocator(false));
1✔
265
    b.group(eventLoop);
1✔
266
    b.channelFactory(channelFactory);
1✔
267
    // For non-socket based channel, the option will be ignored.
268
    b.option(SO_KEEPALIVE, true);
1✔
269
    for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
1✔
270
      // Every entry in the map is obtained from
271
      // NettyChannelBuilder#withOption(ChannelOption<T> option, T value)
272
      // so it is safe to pass the key-value pair to b.option().
273
      b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
1✔
274
    }
1✔
275

276
    ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler);
1✔
277

278
    /*
279
     * We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
280
     * is executed in the event loop and we need this handler to be in the pipeline immediately so
281
     * that it may begin buffering writes.
282
     */
283
    b.handler(bufferingHandler);
1✔
284
    ChannelFuture regFuture = b.register();
1✔
285
    if (regFuture.isDone() && !regFuture.isSuccess()) {
1✔
286
      channel = null;
1✔
287
      // Initialization has failed badly. All new streams should be made to fail.
288
      Throwable t = regFuture.cause();
1✔
289
      if (t == null) {
1✔
290
        t = new IllegalStateException("Channel is null, but future doesn't have a cause");
×
291
      }
292
      statusExplainingWhyTheChannelIsNull = Utils.statusFromThrowable(t);
1✔
293
      // Use a Runnable since lifecycleManager calls transportListener
294
      return new Runnable() {
1✔
295
        @Override
296
        public void run() {
297
          // NOTICE: we not are calling lifecycleManager from the event loop. But there isn't really
298
          // an event loop in this case, so nothing should be accessing the lifecycleManager. We
299
          // could use GlobalEventExecutor (which is what regFuture would use for notifying
300
          // listeners in this case), but avoiding on-demand thread creation in an error case seems
301
          // a good idea and is probably clearer threading.
302
          lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull,
1✔
303
              SimpleDisconnectError.UNKNOWN);
304
        }
1✔
305
      };
306
    }
307
    channel = regFuture.channel();
1✔
308
    // For non-epoll based channel, the option will be ignored.
309
    try {
310
      if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED
1✔
311
              && Class.forName("io.netty.channel.epoll.AbstractEpollChannel").isInstance(channel)) {
1✔
312
        ChannelOption<Integer> tcpUserTimeout = Utils.maybeGetTcpUserTimeoutOption();
1✔
313
        if (tcpUserTimeout != null) {
1✔
314
          int tcpUserTimeoutMs = (int) TimeUnit.NANOSECONDS.toMillis(keepAliveTimeoutNanos);
1✔
315
          channel.config().setOption(tcpUserTimeout, tcpUserTimeoutMs);
1✔
316
        }
317
      }
318
    } catch (ClassNotFoundException ignored) {
×
319
      // JVM did not load AbstractEpollChannel, so the current channel will not be of epoll type,
320
      // so there is no need to set TCP_USER_TIMEOUT
321
    }
1✔
322
    // Start the write queue as soon as the channel is constructed
323
    handler.startWriteQueue(channel);
1✔
324
    // This write will have no effect, yet it will only complete once the negotiationHandler
325
    // flushes any pending writes. We need it to be staged *before* the `connect` so that
326
    // the channel can't have been closed yet, removing all handlers. This write will sit in the
327
    // AbstractBufferingHandler's buffer, and will either be flushed on a successful connection,
328
    // or failed if the connection fails.
329
    channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
1✔
330
      @Override
331
      public void operationComplete(ChannelFuture future) throws Exception {
332
        if (!future.isSuccess()) {
1✔
333
          // Need to notify of this failure, because NettyClientHandler may not have been added to
334
          // the pipeline before the error occurred.
335
          lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()),
1✔
336
              SimpleDisconnectError.UNKNOWN);
337
        }
338
      }
1✔
339
    });
340
    // Start the connection operation to the server.
341
    SocketAddress localAddress =
1✔
342
        localSocketPicker.createSocketAddress(remoteAddress, eagAttributes);
1✔
343
    if (localAddress != null) {
1✔
344
      channel.connect(remoteAddress, localAddress);
×
345
    } else {
346
      channel.connect(remoteAddress);
1✔
347
    }
348

349
    if (keepAliveManager != null) {
1✔
350
      keepAliveManager.onTransportStarted();
1✔
351
    }
352

353
    return null;
1✔
354
  }
355

356
  @Override
357
  public void shutdown(Status reason) {
358
    // start() could have failed
359
    if (channel == null) {
1✔
360
      return;
1✔
361
    }
362
    // Notifying of termination is automatically done when the channel closes.
363
    if (channel.isOpen()) {
1✔
364
      handler.getWriteQueue().enqueue(new GracefulCloseCommand(reason), true);
1✔
365
    }
366
  }
1✔
367

368
  @Override
369
  public void shutdownNow(final Status reason) {
370
    shutdownNow(reason, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
1✔
371
  }
1✔
372

373
  @Override
374
  public void shutdownNow(final Status reason, DisconnectError disconnectError) {
375
    // Notifying of termination is automatically done when the channel closes.
376
    if (channel != null && channel.isOpen()) {
1✔
377
      handler.getWriteQueue().enqueue(new Runnable() {
1✔
378
        @Override
379
        public void run() {
380
          lifecycleManager.notifyShutdown(reason, disconnectError);
1✔
381
          channel.write(new ForcefulCloseCommand(reason));
1✔
382
        }
1✔
383
      }, true);
384
    }
385
  }
1✔
386

387
  @Override
388
  public String toString() {
389
    return MoreObjects.toStringHelper(this)
1✔
390
        .add("logId", logId.getId())
1✔
391
        .add("remoteAddress", remoteAddress)
1✔
392
        .add("channel", channel)
1✔
393
        .toString();
1✔
394
  }
395

396
  @Override
397
  public InternalLogId getLogId() {
398
    return logId;
1✔
399
  }
400

401
  @Override
402
  public Attributes getAttributes() {
403
    return handler.getAttributes();
1✔
404
  }
405

406
  @Override
407
  public ListenableFuture<SocketStats> getStats() {
408
    final SettableFuture<SocketStats> result = SettableFuture.create();
1✔
409
    if (channel.eventLoop().inEventLoop()) {
1✔
410
      // This is necessary, otherwise we will block forever if we get the future from inside
411
      // the event loop.
412
      result.set(getStatsHelper(channel));
×
413
      return result;
×
414
    }
415
    channel.eventLoop().submit(
1✔
416
        new Runnable() {
1✔
417
          @Override
418
          public void run() {
419
            result.set(getStatsHelper(channel));
1✔
420
          }
1✔
421
        })
422
        .addListener(
1✔
423
            new GenericFutureListener<Future<Object>>() {
1✔
424
              @Override
425
              public void operationComplete(Future<Object> future) throws Exception {
426
                if (!future.isSuccess()) {
1✔
427
                  result.setException(future.cause());
×
428
                }
429
              }
1✔
430
            });
431
    return result;
1✔
432
  }
433

434
  private SocketStats getStatsHelper(Channel ch) {
435
    assert ch.eventLoop().inEventLoop();
1✔
436
    return new SocketStats(
1✔
437
        transportTracer.getStats(),
1✔
438
        channel.localAddress(),
1✔
439
        channel.remoteAddress(),
1✔
440
        Utils.getSocketOptions(ch),
1✔
441
        handler == null ? null : handler.getSecurityInfo());
1✔
442
  }
443

444
  @VisibleForTesting
445
  Channel channel() {
446
    return channel;
1✔
447
  }
448

449
  @VisibleForTesting
450
  KeepAliveManager keepAliveManager() {
451
    return keepAliveManager;
1✔
452
  }
453

454
  /**
455
   * Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
456
   * from the pipeline when the channel is closed. Since handlers are removed, you may get an
457
   * unhelpful exception like ClosedChannelException.
458
   *
459
   * <p>This method must only be called on the event loop.
460
   */
461
  private Status statusFromFailedFuture(ChannelFuture f) {
462
    Throwable t = f.cause();
1✔
463
    if (t instanceof ClosedChannelException
1✔
464
        // Exception thrown by the StreamBufferingEncoder if the channel is closed while there
465
        // are still streams buffered. This exception is not helpful. Replace it by the real
466
        // cause of the shutdown (if available).
467
        || t instanceof Http2ChannelClosedException) {
468
      Status shutdownStatus = lifecycleManager.getShutdownStatus();
1✔
469
      if (shutdownStatus == null) {
1✔
470
        return Status.UNKNOWN.withDescription("Channel closed but for unknown reason")
×
471
            .withCause(new ClosedChannelException().initCause(t));
×
472
      }
473
      return shutdownStatus;
1✔
474
    }
475
    return Utils.statusFromThrowable(t);
1✔
476
  }
477
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc