• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19528

28 Oct 2024 05:25PM UTC coverage: 84.59% (-0.04%) from 84.627%
#19528

push

github

web-flow
netty: add soft Metadata size limit enforcement. (#11603)

33914 of 40092 relevant lines covered (84.59%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.7
/../netty/src/main/java/io/grpc/netty/NettyClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
20
import static io.netty.channel.ChannelOption.ALLOCATOR;
21
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.ListenableFuture;
28
import com.google.common.util.concurrent.SettableFuture;
29
import io.grpc.Attributes;
30
import io.grpc.CallOptions;
31
import io.grpc.ChannelLogger;
32
import io.grpc.ClientStreamTracer;
33
import io.grpc.InternalChannelz.SocketStats;
34
import io.grpc.InternalLogId;
35
import io.grpc.Metadata;
36
import io.grpc.MethodDescriptor;
37
import io.grpc.Status;
38
import io.grpc.internal.ClientStream;
39
import io.grpc.internal.ConnectionClientTransport;
40
import io.grpc.internal.FailingClientStream;
41
import io.grpc.internal.GrpcUtil;
42
import io.grpc.internal.Http2Ping;
43
import io.grpc.internal.KeepAliveManager;
44
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
45
import io.grpc.internal.StatsTraceContext;
46
import io.grpc.internal.TransportTracer;
47
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
48
import io.netty.bootstrap.Bootstrap;
49
import io.netty.channel.Channel;
50
import io.netty.channel.ChannelFactory;
51
import io.netty.channel.ChannelFuture;
52
import io.netty.channel.ChannelFutureListener;
53
import io.netty.channel.ChannelHandler;
54
import io.netty.channel.ChannelOption;
55
import io.netty.channel.EventLoop;
56
import io.netty.channel.EventLoopGroup;
57
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
58
import io.netty.util.AsciiString;
59
import io.netty.util.concurrent.Future;
60
import io.netty.util.concurrent.GenericFutureListener;
61
import java.net.SocketAddress;
62
import java.nio.channels.ClosedChannelException;
63
import java.util.Map;
64
import java.util.concurrent.Executor;
65
import java.util.concurrent.TimeUnit;
66
import javax.annotation.Nullable;
67

68
/**
69
 * A Netty-based {@link ConnectionClientTransport} implementation.
70
 */
71
class NettyClientTransport implements ConnectionClientTransport {
1✔
72

73
  private final InternalLogId logId;
74
  private final Map<ChannelOption<?>, ?> channelOptions;
75
  private final SocketAddress remoteAddress;
76
  private final ChannelFactory<? extends Channel> channelFactory;
77
  private final EventLoopGroup group;
78
  private final ProtocolNegotiator negotiator;
79
  private final String authorityString;
80
  private final AsciiString authority;
81
  private final AsciiString userAgent;
82
  private final boolean autoFlowControl;
83
  private final int flowControlWindow;
84
  private final int maxMessageSize;
85
  private final int maxHeaderListSize;
86
  private final int softLimitHeaderListSize;
87
  private KeepAliveManager keepAliveManager;
88
  private final long keepAliveTimeNanos;
89
  private final long keepAliveTimeoutNanos;
90
  private final boolean keepAliveWithoutCalls;
91
  private final AsciiString negotiationScheme;
92
  private final Runnable tooManyPingsRunnable;
93
  private NettyClientHandler handler;
94
  // We should not send on the channel until negotiation completes. This is a hard requirement
95
  // by SslHandler but is appropriate for HTTP/1.1 Upgrade as well.
96
  private Channel channel;
97
  /** If {@link #start} has been called, non-{@code null} if channel is {@code null}. */
98
  private Status statusExplainingWhyTheChannelIsNull;
99
  /** Since not thread-safe, may only be used from event loop. */
100
  private ClientTransportLifecycleManager lifecycleManager;
101
  /** Since not thread-safe, may only be used from event loop. */
102
  private final TransportTracer transportTracer;
103
  private final Attributes eagAttributes;
104
  private final LocalSocketPicker localSocketPicker;
105
  private final ChannelLogger channelLogger;
106
  private final boolean useGetForSafeMethods;
107
  private final Ticker ticker;
108

109
  NettyClientTransport(
110
      SocketAddress address,
111
      ChannelFactory<? extends Channel> channelFactory,
112
      Map<ChannelOption<?>, ?> channelOptions,
113
      EventLoopGroup group,
114
      ProtocolNegotiator negotiator,
115
      boolean autoFlowControl,
116
      int flowControlWindow,
117
      int maxMessageSize,
118
      int maxHeaderListSize,
119
      int softLimitHeaderListSize,
120
      long keepAliveTimeNanos,
121
      long keepAliveTimeoutNanos,
122
      boolean keepAliveWithoutCalls,
123
      String authority,
124
      @Nullable String userAgent,
125
      Runnable tooManyPingsRunnable,
126
      TransportTracer transportTracer,
127
      Attributes eagAttributes,
128
      LocalSocketPicker localSocketPicker,
129
      ChannelLogger channelLogger,
130
      boolean useGetForSafeMethods,
131
      Ticker ticker) {
1✔
132

133
    this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
1✔
134
    this.negotiationScheme = this.negotiator.scheme();
1✔
135
    this.remoteAddress = Preconditions.checkNotNull(address, "address");
1✔
136
    this.group = Preconditions.checkNotNull(group, "group");
1✔
137
    this.channelFactory = channelFactory;
1✔
138
    this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
1✔
139
    this.autoFlowControl = autoFlowControl;
1✔
140
    this.flowControlWindow = flowControlWindow;
1✔
141
    this.maxMessageSize = maxMessageSize;
1✔
142
    this.maxHeaderListSize = maxHeaderListSize;
1✔
143
    this.softLimitHeaderListSize = softLimitHeaderListSize;
1✔
144
    this.keepAliveTimeNanos = keepAliveTimeNanos;
1✔
145
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
1✔
146
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
1✔
147
    this.authorityString = authority;
1✔
148
    this.authority = new AsciiString(authority);
1✔
149
    this.userAgent = new AsciiString(GrpcUtil.getGrpcUserAgent("netty", userAgent));
1✔
150
    this.tooManyPingsRunnable =
1✔
151
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
152
    this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
1✔
153
    this.eagAttributes = Preconditions.checkNotNull(eagAttributes, "eagAttributes");
1✔
154
    this.localSocketPicker = Preconditions.checkNotNull(localSocketPicker, "localSocketPicker");
1✔
155
    this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
1✔
156
    this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
1✔
157
    this.useGetForSafeMethods = useGetForSafeMethods;
1✔
158
    this.ticker = Preconditions.checkNotNull(ticker, "ticker");
1✔
159
  }
1✔
160

161
  @Override
162
  public void ping(final PingCallback callback, final Executor executor) {
163
    if (channel == null) {
1✔
164
      executor.execute(new Runnable() {
1✔
165
        @Override
166
        public void run() {
167
          callback.onFailure(statusExplainingWhyTheChannelIsNull.asException());
1✔
168
        }
1✔
169
      });
170
      return;
1✔
171
    }
172
    // The promise and listener always succeed in NettyClientHandler. So this listener handles the
173
    // error case, when the channel is closed and the NettyClientHandler no longer in the pipeline.
174
    ChannelFutureListener failureListener = new ChannelFutureListener() {
1✔
175
      @Override
176
      public void operationComplete(ChannelFuture future) throws Exception {
177
        if (!future.isSuccess()) {
1✔
178
          Status s = statusFromFailedFuture(future);
1✔
179
          Http2Ping.notifyFailed(callback, executor, s.asException());
1✔
180
        }
181
      }
1✔
182
    };
183
    // Write the command requesting the ping
184
    handler.getWriteQueue().enqueue(new SendPingCommand(callback, executor), true)
1✔
185
        .addListener(failureListener);
1✔
186
  }
1✔
187

188
  @Override
189
  public ClientStream newStream(
190
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
191
      ClientStreamTracer[] tracers) {
192
    Preconditions.checkNotNull(method, "method");
1✔
193
    Preconditions.checkNotNull(headers, "headers");
1✔
194
    if (channel == null) {
1✔
195
      return new FailingClientStream(statusExplainingWhyTheChannelIsNull, tracers);
1✔
196
    }
197
    StatsTraceContext statsTraceCtx =
1✔
198
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
199
    return new NettyClientStream(
1✔
200
        new NettyClientStream.TransportState(
201
            handler,
202
            channel.eventLoop(),
1✔
203
            maxMessageSize,
204
            statsTraceCtx,
205
            transportTracer,
206
            method.getFullMethodName(),
1✔
207
            callOptions) {
1✔
208
          @Override
209
          protected Status statusFromFailedFuture(ChannelFuture f) {
210
            return NettyClientTransport.this.statusFromFailedFuture(f);
1✔
211
          }
212
        },
213
        method,
214
        headers,
215
        channel,
216
        authority,
217
        negotiationScheme,
218
        userAgent,
219
        statsTraceCtx,
220
        transportTracer,
221
        callOptions,
222
        useGetForSafeMethods);
223
  }
224

225
  @SuppressWarnings("unchecked")
226
  @Override
227
  public Runnable start(Listener transportListener) {
228
    lifecycleManager = new ClientTransportLifecycleManager(
1✔
229
        Preconditions.checkNotNull(transportListener, "listener"));
1✔
230
    EventLoop eventLoop = group.next();
1✔
231
    if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
232
      keepAliveManager = new KeepAliveManager(
1✔
233
          new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
234
          keepAliveWithoutCalls);
235
    }
236

237
    handler = NettyClientHandler.newHandler(
1✔
238
            lifecycleManager,
239
            keepAliveManager,
240
            autoFlowControl,
241
            flowControlWindow,
242
            maxHeaderListSize,
243
            softLimitHeaderListSize,
244
            GrpcUtil.STOPWATCH_SUPPLIER,
245
            tooManyPingsRunnable,
246
            transportTracer,
247
            eagAttributes,
248
            authorityString,
249
            channelLogger,
250
            ticker);
251

252
    ChannelHandler negotiationHandler = negotiator.newHandler(handler);
1✔
253

254
    Bootstrap b = new Bootstrap();
1✔
255
    b.option(ALLOCATOR, Utils.getByteBufAllocator(false));
1✔
256
    b.group(eventLoop);
1✔
257
    b.channelFactory(channelFactory);
1✔
258
    // For non-socket based channel, the option will be ignored.
259
    b.option(SO_KEEPALIVE, true);
1✔
260
    for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
1✔
261
      // Every entry in the map is obtained from
262
      // NettyChannelBuilder#withOption(ChannelOption<T> option, T value)
263
      // so it is safe to pass the key-value pair to b.option().
264
      b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
1✔
265
    }
1✔
266

267
    ChannelHandler bufferingHandler = new WriteBufferingAndExceptionHandler(negotiationHandler);
1✔
268

269
    /*
270
     * We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
271
     * is executed in the event loop and we need this handler to be in the pipeline immediately so
272
     * that it may begin buffering writes.
273
     */
274
    b.handler(bufferingHandler);
1✔
275
    ChannelFuture regFuture = b.register();
1✔
276
    if (regFuture.isDone() && !regFuture.isSuccess()) {
1✔
277
      channel = null;
1✔
278
      // Initialization has failed badly. All new streams should be made to fail.
279
      Throwable t = regFuture.cause();
1✔
280
      if (t == null) {
1✔
281
        t = new IllegalStateException("Channel is null, but future doesn't have a cause");
×
282
      }
283
      statusExplainingWhyTheChannelIsNull = Utils.statusFromThrowable(t);
1✔
284
      // Use a Runnable since lifecycleManager calls transportListener
285
      return new Runnable() {
1✔
286
        @Override
287
        public void run() {
288
          // NOTICE: we not are calling lifecycleManager from the event loop. But there isn't really
289
          // an event loop in this case, so nothing should be accessing the lifecycleManager. We
290
          // could use GlobalEventExecutor (which is what regFuture would use for notifying
291
          // listeners in this case), but avoiding on-demand thread creation in an error case seems
292
          // a good idea and is probably clearer threading.
293
          lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull);
1✔
294
        }
1✔
295
      };
296
    }
297
    channel = regFuture.channel();
1✔
298
    // For non-epoll based channel, the option will be ignored.
299
    try {
300
      if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED
1✔
301
              && Class.forName("io.netty.channel.epoll.AbstractEpollChannel").isInstance(channel)) {
1✔
302
        ChannelOption<Integer> tcpUserTimeout = Utils.maybeGetTcpUserTimeoutOption();
1✔
303
        if (tcpUserTimeout != null) {
1✔
304
          int tcpUserTimeoutMs = (int) TimeUnit.NANOSECONDS.toMillis(keepAliveTimeoutNanos);
1✔
305
          channel.config().setOption(tcpUserTimeout, tcpUserTimeoutMs);
1✔
306
        }
307
      }
308
    } catch (ClassNotFoundException ignored) {
×
309
      // JVM did not load AbstractEpollChannel, so the current channel will not be of epoll type,
310
      // so there is no need to set TCP_USER_TIMEOUT
311
    }
1✔
312
    // Start the write queue as soon as the channel is constructed
313
    handler.startWriteQueue(channel);
1✔
314
    // This write will have no effect, yet it will only complete once the negotiationHandler
315
    // flushes any pending writes. We need it to be staged *before* the `connect` so that
316
    // the channel can't have been closed yet, removing all handlers. This write will sit in the
317
    // AbstractBufferingHandler's buffer, and will either be flushed on a successful connection,
318
    // or failed if the connection fails.
319
    channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
1✔
320
      @Override
321
      public void operationComplete(ChannelFuture future) throws Exception {
322
        if (!future.isSuccess()) {
1✔
323
          // Need to notify of this failure, because NettyClientHandler may not have been added to
324
          // the pipeline before the error occurred.
325
          lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()));
1✔
326
        }
327
      }
1✔
328
    });
329
    // Start the connection operation to the server.
330
    SocketAddress localAddress =
1✔
331
        localSocketPicker.createSocketAddress(remoteAddress, eagAttributes);
1✔
332
    if (localAddress != null) {
1✔
333
      channel.connect(remoteAddress, localAddress);
×
334
    } else {
335
      channel.connect(remoteAddress);
1✔
336
    }
337

338
    if (keepAliveManager != null) {
1✔
339
      keepAliveManager.onTransportStarted();
1✔
340
    }
341

342
    return null;
1✔
343
  }
344

345
  @Override
346
  public void shutdown(Status reason) {
347
    // start() could have failed
348
    if (channel == null) {
1✔
349
      return;
1✔
350
    }
351
    // Notifying of termination is automatically done when the channel closes.
352
    if (channel.isOpen()) {
1✔
353
      handler.getWriteQueue().enqueue(new GracefulCloseCommand(reason), true);
1✔
354
    }
355
  }
1✔
356

357
  @Override
358
  public void shutdownNow(final Status reason) {
359
    // Notifying of termination is automatically done when the channel closes.
360
    if (channel != null && channel.isOpen()) {
1✔
361
      handler.getWriteQueue().enqueue(new Runnable() {
1✔
362
        @Override
363
        public void run() {
364
          lifecycleManager.notifyShutdown(reason);
1✔
365
          channel.write(new ForcefulCloseCommand(reason));
1✔
366
        }
1✔
367
      }, true);
368
    }
369
  }
1✔
370

371
  @Override
372
  public String toString() {
373
    return MoreObjects.toStringHelper(this)
1✔
374
        .add("logId", logId.getId())
1✔
375
        .add("remoteAddress", remoteAddress)
1✔
376
        .add("channel", channel)
1✔
377
        .toString();
1✔
378
  }
379

380
  @Override
381
  public InternalLogId getLogId() {
382
    return logId;
1✔
383
  }
384

385
  @Override
386
  public Attributes getAttributes() {
387
    return handler.getAttributes();
1✔
388
  }
389

390
  @Override
391
  public ListenableFuture<SocketStats> getStats() {
392
    final SettableFuture<SocketStats> result = SettableFuture.create();
1✔
393
    if (channel.eventLoop().inEventLoop()) {
1✔
394
      // This is necessary, otherwise we will block forever if we get the future from inside
395
      // the event loop.
396
      result.set(getStatsHelper(channel));
×
397
      return result;
×
398
    }
399
    channel.eventLoop().submit(
1✔
400
        new Runnable() {
1✔
401
          @Override
402
          public void run() {
403
            result.set(getStatsHelper(channel));
1✔
404
          }
1✔
405
        })
406
        .addListener(
1✔
407
            new GenericFutureListener<Future<Object>>() {
1✔
408
              @Override
409
              public void operationComplete(Future<Object> future) throws Exception {
410
                if (!future.isSuccess()) {
1✔
411
                  result.setException(future.cause());
×
412
                }
413
              }
1✔
414
            });
415
    return result;
1✔
416
  }
417

418
  private SocketStats getStatsHelper(Channel ch) {
419
    assert ch.eventLoop().inEventLoop();
1✔
420
    return new SocketStats(
1✔
421
        transportTracer.getStats(),
1✔
422
        channel.localAddress(),
1✔
423
        channel.remoteAddress(),
1✔
424
        Utils.getSocketOptions(ch),
1✔
425
        handler == null ? null : handler.getSecurityInfo());
1✔
426
  }
427

428
  @VisibleForTesting
429
  Channel channel() {
430
    return channel;
1✔
431
  }
432

433
  @VisibleForTesting
434
  KeepAliveManager keepAliveManager() {
435
    return keepAliveManager;
1✔
436
  }
437

438
  /**
439
   * Convert ChannelFuture.cause() to a Status, taking into account that all handlers are removed
440
   * from the pipeline when the channel is closed. Since handlers are removed, you may get an
441
   * unhelpful exception like ClosedChannelException.
442
   *
443
   * <p>This method must only be called on the event loop.
444
   */
445
  private Status statusFromFailedFuture(ChannelFuture f) {
446
    Throwable t = f.cause();
1✔
447
    if (t instanceof ClosedChannelException
1✔
448
        // Exception thrown by the StreamBufferingEncoder if the channel is closed while there
449
        // are still streams buffered. This exception is not helpful. Replace it by the real
450
        // cause of the shutdown (if available).
451
        || t instanceof Http2ChannelClosedException) {
452
      Status shutdownStatus = lifecycleManager.getShutdownStatus();
1✔
453
      if (shutdownStatus == null) {
1✔
454
        return Status.UNKNOWN.withDescription("Channel closed but for unknown reason")
×
455
            .withCause(new ClosedChannelException().initCause(t));
×
456
      }
457
      return shutdownStatus;
1✔
458
    }
459
    return Utils.statusFromThrowable(t);
1✔
460
  }
461
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc