• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19711

24 Feb 2025 02:58PM UTC coverage: 88.545% (-0.08%) from 88.622%
#19711

push

github

web-flow
netty: Per-rpc authority verification against peer cert subject names (#11724)

Per-rpc verification of authority specified via call options or set by the LB API against peer cert's subject names.

34437 of 38892 relevant lines covered (88.55%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.7
/../netty/src/main/java/io/grpc/netty/NettyClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
20
import static io.netty.channel.ChannelOption.ALLOCATOR;
21
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.ListenableFuture;
28
import com.google.common.util.concurrent.SettableFuture;
29
import io.grpc.Attributes;
30
import io.grpc.CallOptions;
31
import io.grpc.ChannelLogger;
32
import io.grpc.ClientStreamTracer;
33
import io.grpc.InternalChannelz.SocketStats;
34
import io.grpc.InternalLogId;
35
import io.grpc.Metadata;
36
import io.grpc.MethodDescriptor;
37
import io.grpc.Status;
38
import io.grpc.internal.ClientStream;
39
import io.grpc.internal.ConnectionClientTransport;
40
import io.grpc.internal.FailingClientStream;
41
import io.grpc.internal.GrpcUtil;
42
import io.grpc.internal.Http2Ping;
43
import io.grpc.internal.KeepAliveManager;
44
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
45
import io.grpc.internal.StatsTraceContext;
46
import io.grpc.internal.TransportTracer;
47
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
48
import io.netty.bootstrap.Bootstrap;
49
import io.netty.channel.Channel;
50
import io.netty.channel.ChannelFactory;
51
import io.netty.channel.ChannelFuture;
52
import io.netty.channel.ChannelFutureListener;
53
import io.netty.channel.ChannelHandler;
54
import io.netty.channel.ChannelOption;
55
import io.netty.channel.EventLoop;
56
import io.netty.channel.EventLoopGroup;
57
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
58
import io.netty.util.AsciiString;
59
import io.netty.util.concurrent.Future;
60
import io.netty.util.concurrent.GenericFutureListener;
61
import java.net.SocketAddress;
62
import java.nio.channels.ClosedChannelException;
63
import java.util.Map;
64
import java.util.concurrent.Executor;
65
import java.util.concurrent.TimeUnit;
66
import javax.annotation.Nullable;
67

68
/**
69
 * A Netty-based {@link ConnectionClientTransport} implementation.
70
 */
71
class NettyClientTransport implements ConnectionClientTransport {
1✔
72

73
  private final InternalLogId logId;
74
  private final Map<ChannelOption<?>, ?> channelOptions;
75
  private final SocketAddress remoteAddress;
76
  private final ChannelFactory<? extends Channel> channelFactory;
77
  private final EventLoopGroup group;
78
  private final ProtocolNegotiator negotiator;
79
  private final String authorityString;
80
  private final AsciiString authority;
81
  private final AsciiString userAgent;
82
  private final boolean autoFlowControl;
83
  private final int flowControlWindow;
84
  private final int maxMessageSize;
85
  private final int maxHeaderListSize;
86
  private final int softLimitHeaderListSize;
87
  private KeepAliveManager keepAliveManager;
88
  private final long keepAliveTimeNanos;
89
  private final long keepAliveTimeoutNanos;
90
  private final boolean keepAliveWithoutCalls;
91
  private final AsciiString negotiationScheme;
92
  private final Runnable tooManyPingsRunnable;
93
  private NettyClientHandler handler;
94
  // We should not send on the channel until negotiation completes. This is a hard requirement
95
  // by SslHandler but is appropriate for HTTP/1.1 Upgrade as well.
96
  private Channel channel;
97
  /** If {@link #start} has been called, non-{@code null} if channel is {@code null}. */
98
  private Status statusExplainingWhyTheChannelIsNull;
99
  /** Since not thread-safe, may only be used from event loop. */
100
  private ClientTransportLifecycleManager lifecycleManager;
101
  /** Since not thread-safe, may only be used from event loop. */
102
  private final TransportTracer transportTracer;
103
  private final Attributes eagAttributes;
104
  private final LocalSocketPicker localSocketPicker;
105
  private final ChannelLogger channelLogger;
106
  private final boolean useGetForSafeMethods;
107
  private final Ticker ticker;
108

109

110
  NettyClientTransport(
111
      SocketAddress address,
112
      ChannelFactory<? extends Channel> channelFactory,
113
      Map<ChannelOption<?>, ?> channelOptions,
114
      EventLoopGroup group,
115
      ProtocolNegotiator negotiator,
116
      boolean autoFlowControl,
117
      int flowControlWindow,
118
      int maxMessageSize,
119
      int maxHeaderListSize,
120
      int softLimitHeaderListSize,
121
      long keepAliveTimeNanos,
122
      long keepAliveTimeoutNanos,
123
      boolean keepAliveWithoutCalls,
124
      String authority,
125
      @Nullable String userAgent,
126
      Runnable tooManyPingsRunnable,
127
      TransportTracer transportTracer,
128
      Attributes eagAttributes,
129
      LocalSocketPicker localSocketPicker,
130
      ChannelLogger channelLogger,
131
      boolean useGetForSafeMethods,
132
      Ticker ticker) {
1✔
133

134
    this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
1✔
135
    this.negotiationScheme = this.negotiator.scheme();
1✔
136
    this.remoteAddress = Preconditions.checkNotNull(address, "address");
1✔
137
    this.group = Preconditions.checkNotNull(group, "group");
1✔
138
    this.channelFactory = channelFactory;
1✔
139
    this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
1✔
140
    this.autoFlowControl = autoFlowControl;
1✔
141
    this.flowControlWindow = flowControlWindow;
1✔
142
    this.maxMessageSize = maxMessageSize;
1✔
143
    this.maxHeaderListSize = maxHeaderListSize;
1✔
144
    this.softLimitHeaderListSize = softLimitHeaderListSize;
1✔
145
    this.keepAliveTimeNanos = keepAliveTimeNanos;
1✔
146
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
1✔
147
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
1✔
148
    this.authorityString = authority;
1✔
149
    this.authority = new AsciiString(authority);
1✔
150
    this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent));
1✔
151
    this.tooManyPingsRunnable =
1✔
152
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
153
    this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
1✔
154
    this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
155
    this.localSocketPicker = Preconditions.checkNotNull(localSocketPicker, "localSocketPicker");
1✔
156
    this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
1✔
157
    this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
1✔
158
    this.useGetForSafeMethods = useGetForSafeMethods;
1✔
159
    this.ticker = Preconditions.checkNotNull(ticker, "ticker");
1✔
160
  }
1✔
161

162
  @Override
163
  public void ping(final PingCallback callback, final Executor executor) {
164
    if (channel == null) {
1✔
165
      executor.execute(new Runnable() {
1✔
166
        @Override
167
        public void run() {
168
          callback.onFailure(statusExplainingWhyTheChannelIsNull.asException());
1✔
169
        }
1✔
170
      });
171
      return;
1✔
172
    }
173
    // The promise and listener always succeed in NettyClientHandler. So this listener handles the
174
    // error case, when the channel is closed and the NettyClientHandler no longer in the pipeline.
175
    ChannelFutureListener failureListener = new ChannelFutureListener() {
1✔
176
      @Override
177
      public void operationComplete(ChannelFuture future) throws Exception {
178
        if (!future.isSuccess()) {
1✔
179
          Status s = statusFromFailedFuture(future);
1✔
180
          Http2Ping.notifyFailed(callback, executor, s.asException());
1✔
181
        }
182
      }
1✔
183
    };
184
    // Write the command requesting the ping
185
    handler.getWriteQueue().enqueue(new SendPingCommand(callback, executor), true)
1✔
186
        .addListener(failureListener);
1✔
187
  }
1✔
188

189
  @Override
190
  public ClientStream newStream(
191
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
192
      ClientStreamTracer[] tracers) {
193
    Preconditions.checkNotNull(method, "method");
1✔
194
    Preconditions.checkNotNull(headers, "headers");
1✔
195
    if (channel == null) {
1✔
196
      return new FailingClientStream(statusExplainingWhyTheChannelIsNull, tracers);
1✔
197
    }
198
    StatsTraceContext statsTraceCtx =
1✔
199
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
200
    return new NettyClientStream(
1✔
201
        new NettyClientStream.TransportState(
202
            handler,
203
            channel.eventLoop(),
1✔
204
            maxMessageSize,
205
            statsTraceCtx,
206
            transportTracer,
207
            method.getFullMethodName(),
1✔
208
            callOptions) {
1✔
209
          @Override
210
          protected Status statusFromFailedFuture(ChannelFuture f) {
211
            return NettyClientTransport.this.statusFromFailedFuture(f);
1✔
212
          }
213
        },
214
        method,
215
        headers,
216
        channel,
217
        authority,
218
        negotiationScheme,
219
        userAgent,
220
        statsTraceCtx,
221
        transportTracer,
222
        callOptions,
223
        useGetForSafeMethods);
224
  }
225

226
  @SuppressWarnings("unchecked")
227
  @Override
228
  public Runnable start(Listener transportListener) {
229
    lifecycleManager = new ClientTransportLifecycleManager(
1✔
230
        Preconditions.checkNotNull(transportListener, "listener"));
1✔
231
    EventLoop eventLoop = group.next();
1✔
232
    if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
233
      keepAliveManager = new KeepAliveManager(
1✔
234
          new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
235
          keepAliveWithoutCalls);
236
    }
237

238
    handler = NettyClientHandler.newHandler(
1✔
239
            lifecycleManager,
240
            keepAliveManager,
241
            autoFlowControl,
242
            flowControlWindow,
243
            maxHeaderListSize,
244
            softLimitHeaderListSize,
245
            GrpcUtil.STOPWATCH_SUPPLIER,
246
            tooManyPingsRunnable,
247
            transportTracer,
248
            eagAttributes,
249
            authorityString,
250
            channelLogger,
251
            ticker);
252

253
    ChannelHandler negotiationHandler = negotiator.newHandler(handler);
1✔
254

255
    Bootstrap b = new Bootstrap();
1✔
256
    b.option(ALLOCATOR, Utils.getByteBufAllocator(false));
1✔
257
    b.group(eventLoop);
1✔
258
    b.channelFactory(channelFactory);
1✔
259
    // For non-socket based channel, the option will be ignored.
260
    b.option(SO_KEEPALIVE, true);
1✔
261
    for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
1✔
262
      // Every entry in the map is obtained from
263
      // NettyChannelBuilder#withOption(ChannelOption<T> option, T value)
264
      // so it is safe to pass the key-value pair to b.option().
265
      b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
1✔
266
    }
1✔
267

268
    ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler);
1✔
269

270
    /*
271
     * We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
272
     * is executed in the event loop and we need this handler to be in the pipeline immediately so
273
     * that it may begin buffering writes.
274
     */
275
    b.handler(bufferingHandler);
1✔
276
    ChannelFuture regFuture = b.register();
1✔
277
    if (regFuture.isDone() && !regFuture.isSuccess()) {
1✔
278
      channel = null;
1✔
279
      // Initialization has failed badly. All new streams should be made to fail.
280
      Throwable t = regFuture.cause();
1✔
281
      if (t == null) {
1✔
282
        t = new IllegalStateException("Channel is null, but future doesn't have a cause");
×
283
      }
284
      statusExplainingWhyTheChannelIsNull = Utils.statusFromThrowable(t);
1✔
285
      // Use a Runnable since lifecycleManager calls transportListener
286
      return new Runnable() {
1✔
287
        @Override
288
        public void run() {
289
          // NOTICE: we not are calling lifecycleManager from the event loop. But there isn't really
290
          // an event loop in this case, so nothing should be accessing the lifecycleManager. We
291
          // could use GlobalEventExecutor (which is what regFuture would use for notifying
292
          // listeners in this case), but avoiding on-demand thread creation in an error case seems
293
          // a good idea and is probably clearer threading.
294
          lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull);
1✔
295
        }
1✔
296
      };
297
    }
298
    channel = regFuture.channel();
1✔
299
    // For non-epoll based channel, the option will be ignored.
300
    try {
301
      if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED
1✔
302
              && Class.forName("io.netty.channel.epoll.AbstractEpollChannel").isInstance(channel)) {
1✔
303
        ChannelOption<Integer> tcpUserTimeout = Utils.maybeGetTcpUserTimeoutOption();
1✔
304
        if (tcpUserTimeout != null) {
1✔
305
          int tcpUserTimeoutMs = (int) TimeUnit.NANOSECONDS.toMillis(keepAliveTimeoutNanos);
1✔
306
          channel.config().setOption(tcpUserTimeout, tcpUserTimeoutMs);
1✔
307
        }
308
      }
309
    } catch (ClassNotFoundException ignored) {
×
310
      // JVM did not load AbstractEpollChannel, so the current channel will not be of epoll type,
311
      // so there is no need to set TCP_USER_TIMEOUT
312
    }
1✔
313
    // Start the write queue as soon as the channel is constructed
314
    handler.startWriteQueue(channel);
1✔
315
    // This write will have no effect, yet it will only complete once the negotiationHandler
316
    // flushes any pending writes. We need it to be staged *before* the `connect` so that
317
    // the channel can't have been closed yet, removing all handlers. This write will sit in the
318
    // AbstractBufferingHandler's buffer, and will either be flushed on a successful connection,
319
    // or failed if the connection fails.
320
    channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
1✔
321
      @Override
322
      public void operationComplete(ChannelFuture future) throws Exception {
323
        if (!future.isSuccess()) {
1✔
324
          // Need to notify of this failure, because NettyClientHandler may not have been added to
325
          // the pipeline before the error occurred.
326
          lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()));
1✔
327
        }
328
      }
1✔
329
    });
330
    // Start the connection operation to the server.
331
    SocketAddress localAddress =
1✔
332
        localSocketPicker.createSocketAddress(remoteAddress, eagAttributes);
1✔
333
    if (localAddress != null) {
1✔
334
      channel.connect(remoteAddress, localAddress);
×
335
    } else {
336
      channel.connect(remoteAddress);
1✔
337
    }
338

339
    if (keepAliveManager != null) {
1✔
340
      keepAliveManager.onTransportStarted();
1✔
341
    }
342

343
    return null;
1✔
344
  }
345

346
  @Override
347
  public void shutdown(Status reason) {
348
    // start() could have failed
349
    if (channel == null) {
1✔
350
      return;
1✔
351
    }
352
    // Notifying of termination is automatically done when the channel closes.
353
    if (channel.isOpen()) {
1✔
354
      handler.getWriteQueue().enqueue(new GracefulCloseCommand(reason), true);
1✔
355
    }
356
  }
1✔
357

358
  @Override
359
  public void shutdownNow(final Status reason) {
360
    // Notifying of termination is automatically done when the channel closes.
361
    if (channel != null && channel.isOpen()) {
1✔
362
      handler.getWriteQueue().enqueue(new Runnable() {
1✔
363
        @Override
364
        public void run() {
365
          lifecycleManager.notifyShutdown(reason);
1✔
366
          channel.write(new ForcefulCloseCommand(reason));
1✔
367
        }
1✔
368
      }, true);
369
    }
370
  }
1✔
371

372
  @Override
373
  public String toString() {
374
    return MoreObjects.toStringHelper(this)
1✔
375
        .add("logId", logId.getId())
1✔
376
        .add("remoteAddress", remoteAddress)
1✔
377
        .add("channel", channel)
1✔
378
        .toString();
1✔
379
  }
380

381
  @Override
382
  public InternalLogId getLogId() {
383
    return logId;
1✔
384
  }
385

386
  @Override
387
  public Attributes getAttributes() {
388
    return handler.getAttributes();
1✔
389
  }
390

391
  @Override
392
  public ListenableFuture<SocketStats> getStats() {
393
    final SettableFuture<SocketStats> result = SettableFuture.create();
1✔
394
    if (channel.eventLoop().inEventLoop()) {
1✔
395
      // This is necessary, otherwise we will block forever if we get the future from inside
396
      // the event loop.
397
      result.set(getStatsHelper(channel));
×
398
      return result;
×
399
    }
400
    channel.eventLoop().submit(
1✔
401
        new Runnable() {
1✔
402
          @Override
403
          public void run() {
404
            result.set(getStatsHelper(channel));
1✔
405
          }
1✔
406
        })
407
        .addListener(
1✔
408
            new GenericFutureListener<Future<Object>>() {
1✔
409
              @Override
410
              public void operationComplete(Future<Object> future) throws Exception {
411
                if (!future.isSuccess()) {
1✔
412
                  result.setException(future.cause());
×
413
                }
414
              }
1✔
415
            });
416
    return result;
1✔
417
  }
418

419
  private SocketStats getStatsHelper(Channel ch) {
420
    assert ch.eventLoop().inEventLoop();
1✔
421
    return new SocketStats(
1✔
422
        transportTracer.getStats(),
1✔
423
        channel.localAddress(),
1✔
424
        channel.remoteAddress(),
1✔
425
        Utils.getSocketOptions(ch),
1✔
426
        handler == null ? null : handler.getSecurityInfo());
1✔
427
  }
428

429
  @VisibleForTesting
430
  Channel channel() {
431
    return channel;
1✔
432
  }
433

434
  @VisibleForTesting
435
  KeepAliveManager keepAliveManager() {
436
    return keepAliveManager;
1✔
437
  }
438

439
  /**
440
   * Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
441
   * from the pipeline when the channel is closed. Since handlers are removed, you may get an
442
   * unhelpful exception like ClosedChannelException.
443
   *
444
   * <p>This method must only be called on the event loop.
445
   */
446
  private Status statusFromFailedFuture(ChannelFuture f) {
447
    Throwable t = f.cause();
1✔
448
    if (t instanceof ClosedChannelException
1✔
449
        // Exception thrown by the StreamBufferingEncoder if the channel is closed while there
450
        // are still streams buffered. This exception is not helpful. Replace it by the real
451
        // cause of the shutdown (if available).
452
        || t instanceof Http2ChannelClosedException) {
453
      Status shutdownStatus = lifecycleManager.getShutdownStatus();
1✔
454
      if (shutdownStatus == null) {
1✔
455
        return Status.UNKNOWN.withDescription("Channel closed but for unknown reason")
×
456
            .withCause(new ClosedChannelException().initCause(t));
×
457
      }
458
      return shutdownStatus;
1✔
459
    }
460
    return Utils.statusFromThrowable(t);
1✔
461
  }
462
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc