• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20126

23 Dec 2025 05:06AM UTC coverage: 88.691% (-0.02%) from 88.706%
#20126

push

github

web-flow
opentelemetry: plumb subchannel metrics disconnect error (#12342)

Finishes the remaining work of
[A94](https://github.com/grpc/proposal/pull/485/files) i.e. the plumbing the disconnect error

35472 of 39995 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.77
/../netty/src/main/java/io/grpc/netty/NettyClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
20
import static io.netty.channel.ChannelOption.ALLOCATOR;
21
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.ListenableFuture;
28
import com.google.common.util.concurrent.SettableFuture;
29
import io.grpc.Attributes;
30
import io.grpc.CallOptions;
31
import io.grpc.ChannelLogger;
32
import io.grpc.ClientStreamTracer;
33
import io.grpc.InternalChannelz.SocketStats;
34
import io.grpc.InternalLogId;
35
import io.grpc.Metadata;
36
import io.grpc.MethodDescriptor;
37
import io.grpc.Status;
38
import io.grpc.internal.ClientStream;
39
import io.grpc.internal.ConnectionClientTransport;
40
import io.grpc.internal.DisconnectError;
41
import io.grpc.internal.FailingClientStream;
42
import io.grpc.internal.GrpcUtil;
43
import io.grpc.internal.Http2Ping;
44
import io.grpc.internal.KeepAliveManager;
45
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
46
import io.grpc.internal.SimpleDisconnectError;
47
import io.grpc.internal.StatsTraceContext;
48
import io.grpc.internal.TransportTracer;
49
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
50
import io.netty.bootstrap.Bootstrap;
51
import io.netty.channel.Channel;
52
import io.netty.channel.ChannelFactory;
53
import io.netty.channel.ChannelFuture;
54
import io.netty.channel.ChannelFutureListener;
55
import io.netty.channel.ChannelHandler;
56
import io.netty.channel.ChannelOption;
57
import io.netty.channel.EventLoop;
58
import io.netty.channel.EventLoopGroup;
59
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
60
import io.netty.util.AsciiString;
61
import io.netty.util.concurrent.Future;
62
import io.netty.util.concurrent.GenericFutureListener;
63
import java.net.SocketAddress;
64
import java.nio.channels.ClosedChannelException;
65
import java.util.Map;
66
import java.util.concurrent.Executor;
67
import java.util.concurrent.TimeUnit;
68
import javax.annotation.Nullable;
69

70
/**
71
 * A Netty-based {@link ConnectionClientTransport} implementation.
72
 */
73
class NettyClientTransport implements ConnectionClientTransport,
1✔
74
    ClientKeepAlivePinger.TransportWithDisconnectReason {
75

76
  private final InternalLogId logId;
77
  private final Map<ChannelOption<?>, ?> channelOptions;
78
  private final SocketAddress remoteAddress;
79
  private final ChannelFactory<? extends Channel> channelFactory;
80
  private final EventLoopGroup group;
81
  private final ProtocolNegotiator negotiator;
82
  private final String authorityString;
83
  private final AsciiString authority;
84
  private final AsciiString userAgent;
85
  private final boolean autoFlowControl;
86
  private final int flowControlWindow;
87
  private final int maxMessageSize;
88
  private final int maxHeaderListSize;
89
  private final int softLimitHeaderListSize;
90
  private KeepAliveManager keepAliveManager;
91
  private final long keepAliveTimeNanos;
92
  private final long keepAliveTimeoutNanos;
93
  private final boolean keepAliveWithoutCalls;
94
  private final AsciiString negotiationScheme;
95
  private final Runnable tooManyPingsRunnable;
96
  private NettyClientHandler handler;
97
  // We should not send on the channel until negotiation completes. This is a hard requirement
98
  // by SslHandler but is appropriate for HTTP/1.1 Upgrade as well.
99
  private Channel channel;
100
  /** If {@link #start} has been called, non-{@code null} if channel is {@code null}. */
101
  private Status statusExplainingWhyTheChannelIsNull;
102
  /** Since not thread-safe, may only be used from event loop. */
103
  private ClientTransportLifecycleManager lifecycleManager;
104
  /** Since not thread-safe, may only be used from event loop. */
105
  private final TransportTracer transportTracer;
106
  private final Attributes eagAttributes;
107
  private final LocalSocketPicker localSocketPicker;
108
  private final ChannelLogger channelLogger;
109
  private final boolean useGetForSafeMethods;
110
  private final Ticker ticker;
111

112

113
  NettyClientTransport(
114
      SocketAddress address,
115
      ChannelFactory<? extends Channel> channelFactory,
116
      Map<ChannelOption<?>, ?> channelOptions,
117
      EventLoopGroup group,
118
      ProtocolNegotiator negotiator,
119
      boolean autoFlowControl,
120
      int flowControlWindow,
121
      int maxMessageSize,
122
      int maxHeaderListSize,
123
      int softLimitHeaderListSize,
124
      long keepAliveTimeNanos,
125
      long keepAliveTimeoutNanos,
126
      boolean keepAliveWithoutCalls,
127
      String authority,
128
      @Nullable String userAgent,
129
      Runnable tooManyPingsRunnable,
130
      TransportTracer transportTracer,
131
      Attributes eagAttributes,
132
      LocalSocketPicker localSocketPicker,
133
      ChannelLogger channelLogger,
134
      boolean useGetForSafeMethods,
135
      Ticker ticker) {
1✔
136

137
    this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
1✔
138
    this.negotiationScheme = this.negotiator.scheme();
1✔
139
    this.remoteAddress = Preconditions.checkNotNull(address, "address");
1✔
140
    this.group = Preconditions.checkNotNull(group, "group");
1✔
141
    this.channelFactory = channelFactory;
1✔
142
    this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
1✔
143
    this.autoFlowControl = autoFlowControl;
1✔
144
    this.flowControlWindow = flowControlWindow;
1✔
145
    this.maxMessageSize = maxMessageSize;
1✔
146
    this.maxHeaderListSize = maxHeaderListSize;
1✔
147
    this.softLimitHeaderListSize = softLimitHeaderListSize;
1✔
148
    this.keepAliveTimeNanos = keepAliveTimeNanos;
1✔
149
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
1✔
150
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
1✔
151
    this.authorityString = authority;
1✔
152
    this.authority = new AsciiString(authority);
1✔
153
    this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent));
1✔
154
    this.tooManyPingsRunnable =
1✔
155
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
156
    this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
1✔
157
    this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
158
    this.localSocketPicker = Preconditions.checkNotNull(localSocketPicker, "localSocketPicker");
1✔
159
    this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
1✔
160
    this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
1✔
161
    this.useGetForSafeMethods = useGetForSafeMethods;
1✔
162
    this.ticker = Preconditions.checkNotNull(ticker, "ticker");
1✔
163
  }
1✔
164

165
  @Override
166
  public void ping(final PingCallback callback, final Executor executor) {
167
    if (channel == null) {
1✔
168
      executor.execute(new Runnable() {
1✔
169
        @Override
170
        public void run() {
171
          callback.onFailure(statusExplainingWhyTheChannelIsNull);
1✔
172
        }
1✔
173
      });
174
      return;
1✔
175
    }
176
    // The promise and listener always succeed in NettyClientHandler. So this listener handles the
177
    // error case, when the channel is closed and the NettyClientHandler no longer in the pipeline.
178
    ChannelFutureListener failureListener = new ChannelFutureListener() {
1✔
179
      @Override
180
      public void operationComplete(ChannelFuture future) throws Exception {
181
        if (!future.isSuccess()) {
1✔
182
          Status s = statusFromFailedFuture(future);
1✔
183
          Http2Ping.notifyFailed(callback, executor, s);
1✔
184
        }
185
      }
1✔
186
    };
187
    // Write the command requesting the ping
188
    handler.getWriteQueue().enqueue(new SendPingCommand(callback, executor), true)
1✔
189
        .addListener(failureListener);
1✔
190
  }
1✔
191

192
  @Override
193
  public ClientStream newStream(
194
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
195
      ClientStreamTracer[] tracers) {
196
    Preconditions.checkNotNull(method, "method");
1✔
197
    Preconditions.checkNotNull(headers, "headers");
1✔
198
    if (channel == null) {
1✔
199
      return new FailingClientStream(statusExplainingWhyTheChannelIsNull, tracers);
1✔
200
    }
201
    StatsTraceContext statsTraceCtx =
1✔
202
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
203
    return new NettyClientStream(
1✔
204
        new NettyClientStream.TransportState(
205
            handler,
206
            channel.eventLoop(),
1✔
207
            maxMessageSize,
208
            statsTraceCtx,
209
            transportTracer,
210
            method.getFullMethodName(),
1✔
211
            callOptions) {
1✔
212
          @Override
213
          protected Status statusFromFailedFuture(ChannelFuture f) {
214
            return NettyClientTransport.this.statusFromFailedFuture(f);
1✔
215
          }
216
        },
217
        method,
218
        headers,
219
        channel,
220
        authority,
221
        negotiationScheme,
222
        userAgent,
223
        statsTraceCtx,
224
        transportTracer,
225
        callOptions,
226
        useGetForSafeMethods);
227
  }
228

229
  @SuppressWarnings("unchecked")
230
  @Override
231
  public Runnable start(Listener transportListener) {
232
    lifecycleManager = new ClientTransportLifecycleManager(
1✔
233
        Preconditions.checkNotNull(transportListener, "listener"));
1✔
234
    EventLoop eventLoop = group.next();
1✔
235
    if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
236
      keepAliveManager = new KeepAliveManager(
1✔
237
          new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos,
238
          keepAliveTimeoutNanos, keepAliveWithoutCalls);
239
    }
240

241
    handler = NettyClientHandler.newHandler(
1✔
242
            lifecycleManager,
243
            keepAliveManager,
244
            autoFlowControl,
245
            flowControlWindow,
246
            maxHeaderListSize,
247
            softLimitHeaderListSize,
248
            GrpcUtil.STOPWATCH_SUPPLIER,
249
            tooManyPingsRunnable,
250
            transportTracer,
251
            eagAttributes,
252
            authorityString,
253
            channelLogger,
254
            ticker);
255

256
    ChannelHandler negotiationHandler = negotiator.newHandler(handler);
1✔
257

258
    Bootstrap b = new Bootstrap();
1✔
259
    b.option(ALLOCATOR, Utils.getByteBufAllocator(false));
1✔
260
    b.group(eventLoop);
1✔
261
    b.channelFactory(channelFactory);
1✔
262
    // For non-socket based channel, the option will be ignored.
263
    b.option(SO_KEEPALIVE, true);
1✔
264
    for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
1✔
265
      // Every entry in the map is obtained from
266
      // NettyChannelBuilder#withOption(ChannelOption<T> option, T value)
267
      // so it is safe to pass the key-value pair to b.option().
268
      b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
1✔
269
    }
1✔
270

271
    ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler);
1✔
272

273
    /*
274
     * We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
275
     * is executed in the event loop and we need this handler to be in the pipeline immediately so
276
     * that it may begin buffering writes.
277
     */
278
    b.handler(bufferingHandler);
1✔
279
    ChannelFuture regFuture = b.register();
1✔
280
    if (regFuture.isDone() && !regFuture.isSuccess()) {
1✔
281
      channel = null;
1✔
282
      // Initialization has failed badly. All new streams should be made to fail.
283
      Throwable t = regFuture.cause();
1✔
284
      if (t == null) {
1✔
285
        t = new IllegalStateException("Channel is null, but future doesn't have a cause");
×
286
      }
287
      statusExplainingWhyTheChannelIsNull = Utils.statusFromThrowable(t);
1✔
288
      // Use a Runnable since lifecycleManager calls transportListener
289
      return new Runnable() {
1✔
290
        @Override
291
        public void run() {
292
          // NOTICE: we not are calling lifecycleManager from the event loop. But there isn't really
293
          // an event loop in this case, so nothing should be accessing the lifecycleManager. We
294
          // could use GlobalEventExecutor (which is what regFuture would use for notifying
295
          // listeners in this case), but avoiding on-demand thread creation in an error case seems
296
          // a good idea and is probably clearer threading.
297
          lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull,
1✔
298
              SimpleDisconnectError.UNKNOWN);
299
        }
1✔
300
      };
301
    }
302
    channel = regFuture.channel();
1✔
303
    // For non-epoll based channel, the option will be ignored.
304
    try {
305
      if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED
1✔
306
              && Class.forName("io.netty.channel.epoll.AbstractEpollChannel").isInstance(channel)) {
1✔
307
        ChannelOption<Integer> tcpUserTimeout = Utils.maybeGetTcpUserTimeoutOption();
1✔
308
        if (tcpUserTimeout != null) {
1✔
309
          int tcpUserTimeoutMs = (int) TimeUnit.NANOSECONDS.toMillis(keepAliveTimeoutNanos);
1✔
310
          channel.config().setOption(tcpUserTimeout, tcpUserTimeoutMs);
1✔
311
        }
312
      }
313
    } catch (ClassNotFoundException ignored) {
×
314
      // JVM did not load AbstractEpollChannel, so the current channel will not be of epoll type,
315
      // so there is no need to set TCP_USER_TIMEOUT
316
    }
1✔
317
    // Start the write queue as soon as the channel is constructed
318
    handler.startWriteQueue(channel);
1✔
319
    // This write will have no effect, yet it will only complete once the negotiationHandler
320
    // flushes any pending writes. We need it to be staged *before* the `connect` so that
321
    // the channel can't have been closed yet, removing all handlers. This write will sit in the
322
    // AbstractBufferingHandler's buffer, and will either be flushed on a successful connection,
323
    // or failed if the connection fails.
324
    channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
1✔
325
      @Override
326
      public void operationComplete(ChannelFuture future) throws Exception {
327
        if (!future.isSuccess()) {
1✔
328
          // Need to notify of this failure, because NettyClientHandler may not have been added to
329
          // the pipeline before the error occurred.
330
          lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()),
1✔
331
              SimpleDisconnectError.UNKNOWN);
332
        }
333
      }
1✔
334
    });
335
    // Start the connection operation to the server.
336
    SocketAddress localAddress =
1✔
337
        localSocketPicker.createSocketAddress(remoteAddress, eagAttributes);
1✔
338
    if (localAddress != null) {
1✔
339
      channel.connect(remoteAddress, localAddress);
×
340
    } else {
341
      channel.connect(remoteAddress);
1✔
342
    }
343

344
    if (keepAliveManager != null) {
1✔
345
      keepAliveManager.onTransportStarted();
1✔
346
    }
347

348
    return null;
1✔
349
  }
350

351
  @Override
352
  public void shutdown(Status reason) {
353
    // start() could have failed
354
    if (channel == null) {
1✔
355
      return;
1✔
356
    }
357
    // Notifying of termination is automatically done when the channel closes.
358
    if (channel.isOpen()) {
1✔
359
      handler.getWriteQueue().enqueue(new GracefulCloseCommand(reason), true);
1✔
360
    }
361
  }
1✔
362

363
  @Override
364
  public void shutdownNow(final Status reason) {
365
    shutdownNow(reason, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
1✔
366
  }
1✔
367

368
  @Override
369
  public void shutdownNow(final Status reason, DisconnectError disconnectError) {
370
    // Notifying of termination is automatically done when the channel closes.
371
    if (channel != null && channel.isOpen()) {
1✔
372
      handler.getWriteQueue().enqueue(new Runnable() {
1✔
373
        @Override
374
        public void run() {
375
          lifecycleManager.notifyShutdown(reason, disconnectError);
1✔
376
          channel.write(new ForcefulCloseCommand(reason));
1✔
377
        }
1✔
378
      }, true);
379
    }
380
  }
1✔
381

382
  @Override
383
  public String toString() {
384
    return MoreObjects.toStringHelper(this)
1✔
385
        .add("logId", logId.getId())
1✔
386
        .add("remoteAddress", remoteAddress)
1✔
387
        .add("channel", channel)
1✔
388
        .toString();
1✔
389
  }
390

391
  @Override
392
  public InternalLogId getLogId() {
393
    return logId;
1✔
394
  }
395

396
  @Override
397
  public Attributes getAttributes() {
398
    return handler.getAttributes();
1✔
399
  }
400

401
  @Override
402
  public ListenableFuture<SocketStats> getStats() {
403
    final SettableFuture<SocketStats> result = SettableFuture.create();
1✔
404
    if (channel.eventLoop().inEventLoop()) {
1✔
405
      // This is necessary, otherwise we will block forever if we get the future from inside
406
      // the event loop.
407
      result.set(getStatsHelper(channel));
×
408
      return result;
×
409
    }
410
    channel.eventLoop().submit(
1✔
411
        new Runnable() {
1✔
412
          @Override
413
          public void run() {
414
            result.set(getStatsHelper(channel));
1✔
415
          }
1✔
416
        })
417
        .addListener(
1✔
418
            new GenericFutureListener<Future<Object>>() {
1✔
419
              @Override
420
              public void operationComplete(Future<Object> future) throws Exception {
421
                if (!future.isSuccess()) {
1✔
422
                  result.setException(future.cause());
×
423
                }
424
              }
1✔
425
            });
426
    return result;
1✔
427
  }
428

429
  private SocketStats getStatsHelper(Channel ch) {
430
    assert ch.eventLoop().inEventLoop();
1✔
431
    return new SocketStats(
1✔
432
        transportTracer.getStats(),
1✔
433
        channel.localAddress(),
1✔
434
        channel.remoteAddress(),
1✔
435
        Utils.getSocketOptions(ch),
1✔
436
        handler == null ? null : handler.getSecurityInfo());
1✔
437
  }
438

439
  @VisibleForTesting
440
  Channel channel() {
441
    return channel;
1✔
442
  }
443

444
  @VisibleForTesting
445
  KeepAliveManager keepAliveManager() {
446
    return keepAliveManager;
1✔
447
  }
448

449
  /**
450
   * Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
451
   * from the pipeline when the channel is closed. Since handlers are removed, you may get an
452
   * unhelpful exception like ClosedChannelException.
453
   *
454
   * <p>This method must only be called on the event loop.
455
   */
456
  private Status statusFromFailedFuture(ChannelFuture f) {
457
    Throwable t = f.cause();
1✔
458
    if (t instanceof ClosedChannelException
1✔
459
        // Exception thrown by the StreamBufferingEncoder if the channel is closed while there
460
        // are still streams buffered. This exception is not helpful. Replace it by the real
461
        // cause of the shutdown (if available).
462
        || t instanceof Http2ChannelClosedException) {
463
      Status shutdownStatus = lifecycleManager.getShutdownStatus();
1✔
464
      if (shutdownStatus == null) {
1✔
465
        return Status.UNKNOWN.withDescription("Channel closed but for unknown reason")
×
466
            .withCause(new ClosedChannelException().initCause(t));
×
467
      }
468
      return shutdownStatus;
1✔
469
    }
470
    return Utils.statusFromThrowable(t);
1✔
471
  }
472
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc