• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19504

10 Oct 2024 11:31PM UTC coverage: 84.654% (-0.01%) from 84.668%
#19504

push

github

web-flow
s2a: Add S2AStub cleanup handler. (#11600)

* Add S2AStub cleanup handler.

* Give TLS and Cleanup handlers name + update comment.

* Don't add TLS handler twice.

* Don't remove explicitly, since done by fireProtocolNegotiationEvent.

* plumb S2AStub close to handshake end + add integration test.

* close stub when TLS negotiation fails.

33781 of 39905 relevant lines covered (84.65%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.53
/../netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Preconditions;
24
import com.google.errorprone.annotations.ForOverride;
25
import io.grpc.Attributes;
26
import io.grpc.CallCredentials;
27
import io.grpc.ChannelCredentials;
28
import io.grpc.ChannelLogger;
29
import io.grpc.ChannelLogger.ChannelLogLevel;
30
import io.grpc.ChoiceChannelCredentials;
31
import io.grpc.ChoiceServerCredentials;
32
import io.grpc.CompositeCallCredentials;
33
import io.grpc.CompositeChannelCredentials;
34
import io.grpc.Grpc;
35
import io.grpc.InsecureChannelCredentials;
36
import io.grpc.InsecureServerCredentials;
37
import io.grpc.InternalChannelz.Security;
38
import io.grpc.InternalChannelz.Tls;
39
import io.grpc.SecurityLevel;
40
import io.grpc.ServerCredentials;
41
import io.grpc.Status;
42
import io.grpc.TlsChannelCredentials;
43
import io.grpc.TlsServerCredentials;
44
import io.grpc.internal.GrpcAttributes;
45
import io.grpc.internal.GrpcUtil;
46
import io.grpc.internal.ObjectPool;
47
import io.netty.channel.ChannelDuplexHandler;
48
import io.netty.channel.ChannelFutureListener;
49
import io.netty.channel.ChannelHandler;
50
import io.netty.channel.ChannelHandlerContext;
51
import io.netty.channel.ChannelInboundHandlerAdapter;
52
import io.netty.handler.codec.http.DefaultHttpRequest;
53
import io.netty.handler.codec.http.HttpClientCodec;
54
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
55
import io.netty.handler.codec.http.HttpHeaderNames;
56
import io.netty.handler.codec.http.HttpMethod;
57
import io.netty.handler.codec.http.HttpVersion;
58
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
59
import io.netty.handler.proxy.HttpProxyHandler;
60
import io.netty.handler.proxy.ProxyConnectionEvent;
61
import io.netty.handler.ssl.OpenSsl;
62
import io.netty.handler.ssl.OpenSslEngine;
63
import io.netty.handler.ssl.SslContext;
64
import io.netty.handler.ssl.SslContextBuilder;
65
import io.netty.handler.ssl.SslHandler;
66
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
67
import io.netty.handler.ssl.SslProvider;
68
import io.netty.util.AsciiString;
69
import java.io.ByteArrayInputStream;
70
import java.net.SocketAddress;
71
import java.net.URI;
72
import java.nio.channels.ClosedChannelException;
73
import java.util.Arrays;
74
import java.util.EnumSet;
75
import java.util.Optional;
76
import java.util.Set;
77
import java.util.concurrent.Executor;
78
import java.util.logging.Level;
79
import java.util.logging.Logger;
80
import javax.annotation.Nullable;
81
import javax.net.ssl.SSLEngine;
82
import javax.net.ssl.SSLException;
83
import javax.net.ssl.SSLParameters;
84
import javax.net.ssl.SSLSession;
85

86
/**
87
 * Common {@link ProtocolNegotiator}s used by gRPC.
88
 */
89
final class ProtocolNegotiators {
90
  private static final Logger log = Logger.getLogger(ProtocolNegotiators.class.getName());
1✔
91
  private static final EnumSet<TlsChannelCredentials.Feature> understoodTlsFeatures =
1✔
92
      EnumSet.of(
1✔
93
          TlsChannelCredentials.Feature.MTLS, TlsChannelCredentials.Feature.CUSTOM_MANAGERS);
94
  private static final EnumSet<TlsServerCredentials.Feature> understoodServerTlsFeatures =
1✔
95
      EnumSet.of(
1✔
96
          TlsServerCredentials.Feature.MTLS, TlsServerCredentials.Feature.CUSTOM_MANAGERS);
97

98

99
  private ProtocolNegotiators() {
100
  }
101

102
  public static FromChannelCredentialsResult from(ChannelCredentials creds) {
103
    if (creds instanceof TlsChannelCredentials) {
1✔
104
      TlsChannelCredentials tlsCreds = (TlsChannelCredentials) creds;
1✔
105
      Set<TlsChannelCredentials.Feature> incomprehensible =
1✔
106
          tlsCreds.incomprehensible(understoodTlsFeatures);
1✔
107
      if (!incomprehensible.isEmpty()) {
1✔
108
        return FromChannelCredentialsResult.error(
1✔
109
            "TLS features not understood: " + incomprehensible);
110
      }
111
      SslContextBuilder builder = GrpcSslContexts.forClient();
1✔
112
      if (tlsCreds.getKeyManagers() != null) {
1✔
113
        builder.keyManager(new FixedKeyManagerFactory(tlsCreds.getKeyManagers()));
1✔
114
      } else if (tlsCreds.getPrivateKey() != null) {
1✔
115
        builder.keyManager(
1✔
116
            new ByteArrayInputStream(tlsCreds.getCertificateChain()),
1✔
117
            new ByteArrayInputStream(tlsCreds.getPrivateKey()),
1✔
118
            tlsCreds.getPrivateKeyPassword());
1✔
119
      }
120
      if (tlsCreds.getTrustManagers() != null) {
1✔
121
        builder.trustManager(new FixedTrustManagerFactory(tlsCreds.getTrustManagers()));
1✔
122
      } else if (tlsCreds.getRootCertificates() != null) {
1✔
123
        builder.trustManager(new ByteArrayInputStream(tlsCreds.getRootCertificates()));
1✔
124
      } // else use system default
125
      try {
126
        return FromChannelCredentialsResult.negotiator(tlsClientFactory(builder.build()));
1✔
127
      } catch (SSLException ex) {
×
128
        log.log(Level.FINE, "Exception building SslContext", ex);
×
129
        return FromChannelCredentialsResult.error(
×
130
            "Unable to create SslContext: " + ex.getMessage());
×
131
      }
132

133
    } else if (creds instanceof InsecureChannelCredentials) {
1✔
134
      return FromChannelCredentialsResult.negotiator(plaintextClientFactory());
1✔
135

136
    } else if (creds instanceof CompositeChannelCredentials) {
1✔
137
      CompositeChannelCredentials compCreds = (CompositeChannelCredentials) creds;
1✔
138
      return from(compCreds.getChannelCredentials())
1✔
139
          .withCallCredentials(compCreds.getCallCredentials());
1✔
140

141
    } else if (creds instanceof NettyChannelCredentials) {
1✔
142
      NettyChannelCredentials nettyCreds = (NettyChannelCredentials) creds;
1✔
143
      return FromChannelCredentialsResult.negotiator(nettyCreds.getNegotiator());
1✔
144

145
    } else if (creds instanceof ChoiceChannelCredentials) {
1✔
146
      ChoiceChannelCredentials choiceCreds = (ChoiceChannelCredentials) creds;
1✔
147
      StringBuilder error = new StringBuilder();
1✔
148
      for (ChannelCredentials innerCreds : choiceCreds.getCredentialsList()) {
1✔
149
        FromChannelCredentialsResult result = from(innerCreds);
1✔
150
        if (result.error == null) {
1✔
151
          return result;
1✔
152
        }
153
        error.append(", ");
1✔
154
        error.append(result.error);
1✔
155
      }
1✔
156
      return FromChannelCredentialsResult.error(error.substring(2));
1✔
157

158
    } else {
159
      return FromChannelCredentialsResult.error(
1✔
160
          "Unsupported credential type: " + creds.getClass().getName());
1✔
161
    }
162
  }
163

164
  public static FromServerCredentialsResult from(ServerCredentials creds) {
165
    if (creds instanceof TlsServerCredentials) {
1✔
166
      TlsServerCredentials tlsCreds = (TlsServerCredentials) creds;
1✔
167
      Set<TlsServerCredentials.Feature> incomprehensible =
1✔
168
          tlsCreds.incomprehensible(understoodServerTlsFeatures);
1✔
169
      if (!incomprehensible.isEmpty()) {
1✔
170
        return FromServerCredentialsResult.error(
1✔
171
            "TLS features not understood: " + incomprehensible);
172
      }
173
      SslContextBuilder builder;
174
      if (tlsCreds.getKeyManagers() != null) {
1✔
175
        builder = GrpcSslContexts.configure(SslContextBuilder.forServer(
1✔
176
            new FixedKeyManagerFactory(tlsCreds.getKeyManagers())));
1✔
177
      } else if (tlsCreds.getPrivateKey() != null) {
1✔
178
        builder = GrpcSslContexts.forServer(
1✔
179
            new ByteArrayInputStream(tlsCreds.getCertificateChain()),
1✔
180
            new ByteArrayInputStream(tlsCreds.getPrivateKey()),
1✔
181
            tlsCreds.getPrivateKeyPassword());
1✔
182
      } else {
183
        throw new AssertionError("BUG! No key");
×
184
      }
185
      if (tlsCreds.getTrustManagers() != null) {
1✔
186
        builder.trustManager(new FixedTrustManagerFactory(tlsCreds.getTrustManagers()));
1✔
187
      } else if (tlsCreds.getRootCertificates() != null) {
1✔
188
        builder.trustManager(new ByteArrayInputStream(tlsCreds.getRootCertificates()));
1✔
189
      } // else use system default
190
      switch (tlsCreds.getClientAuth()) {
1✔
191
        case OPTIONAL:
192
          builder.clientAuth(io.netty.handler.ssl.ClientAuth.OPTIONAL);
1✔
193
          break;
1✔
194

195
        case REQUIRE:
196
          builder.clientAuth(io.netty.handler.ssl.ClientAuth.REQUIRE);
1✔
197
          break;
1✔
198

199
        case NONE:
200
          builder.clientAuth(io.netty.handler.ssl.ClientAuth.NONE);
1✔
201
          break;
1✔
202

203
        default:
204
          return FromServerCredentialsResult.error(
×
205
              "Unknown TlsServerCredentials.ClientAuth value: " + tlsCreds.getClientAuth());
×
206
      }
207
      SslContext sslContext;
208
      try {
209
        sslContext = builder.build();
1✔
210
      } catch (SSLException ex) {
×
211
        throw new IllegalArgumentException(
×
212
            "Unexpected error converting ServerCredentials to Netty SslContext", ex);
213
      }
1✔
214
      return FromServerCredentialsResult.negotiator(serverTlsFactory(sslContext));
1✔
215

216
    } else if (creds instanceof InsecureServerCredentials) {
1✔
217
      return FromServerCredentialsResult.negotiator(serverPlaintextFactory());
1✔
218

219
    } else if (creds instanceof NettyServerCredentials) {
1✔
220
      NettyServerCredentials nettyCreds = (NettyServerCredentials) creds;
1✔
221
      return FromServerCredentialsResult.negotiator(nettyCreds.getNegotiator());
1✔
222

223
    } else if (creds instanceof ChoiceServerCredentials) {
1✔
224
      ChoiceServerCredentials choiceCreds = (ChoiceServerCredentials) creds;
1✔
225
      StringBuilder error = new StringBuilder();
1✔
226
      for (ServerCredentials innerCreds : choiceCreds.getCredentialsList()) {
1✔
227
        FromServerCredentialsResult result = from(innerCreds);
1✔
228
        if (result.error == null) {
1✔
229
          return result;
1✔
230
        }
231
        error.append(", ");
1✔
232
        error.append(result.error);
1✔
233
      }
1✔
234
      return FromServerCredentialsResult.error(error.substring(2));
1✔
235

236
    } else {
237
      return FromServerCredentialsResult.error(
1✔
238
          "Unsupported credential type: " + creds.getClass().getName());
1✔
239
    }
240
  }
241

242
  public static final class FromChannelCredentialsResult {
243
    public final ProtocolNegotiator.ClientFactory negotiator;
244
    public final CallCredentials callCredentials;
245
    public final String error;
246

247
    private FromChannelCredentialsResult(ProtocolNegotiator.ClientFactory negotiator,
248
        CallCredentials creds, String error) {
1✔
249
      this.negotiator = negotiator;
1✔
250
      this.callCredentials = creds;
1✔
251
      this.error = error;
1✔
252
    }
1✔
253

254
    public static FromChannelCredentialsResult error(String error) {
255
      return new FromChannelCredentialsResult(
1✔
256
          null, null, Preconditions.checkNotNull(error, "error"));
1✔
257
    }
258

259
    public static FromChannelCredentialsResult negotiator(
260
        ProtocolNegotiator.ClientFactory factory) {
261
      return new FromChannelCredentialsResult(
1✔
262
          Preconditions.checkNotNull(factory, "factory"), null, null);
1✔
263
    }
264

265
    public FromChannelCredentialsResult withCallCredentials(CallCredentials callCreds) {
266
      Preconditions.checkNotNull(callCreds, "callCreds");
1✔
267
      if (error != null) {
1✔
268
        return this;
×
269
      }
270
      if (this.callCredentials != null) {
1✔
271
        callCreds = new CompositeCallCredentials(this.callCredentials, callCreds);
×
272
      }
273
      return new FromChannelCredentialsResult(negotiator, callCreds, null);
1✔
274
    }
275
  }
276

277
  public static final class FromServerCredentialsResult {
278
    public final ProtocolNegotiator.ServerFactory negotiator;
279
    public final String error;
280

281
    private FromServerCredentialsResult(ProtocolNegotiator.ServerFactory negotiator, String error) {
1✔
282
      this.negotiator = negotiator;
1✔
283
      this.error = error;
1✔
284
    }
1✔
285

286
    public static FromServerCredentialsResult error(String error) {
287
      return new FromServerCredentialsResult(null, Preconditions.checkNotNull(error, "error"));
1✔
288
    }
289

290
    public static FromServerCredentialsResult negotiator(ProtocolNegotiator.ServerFactory factory) {
291
      return new FromServerCredentialsResult(Preconditions.checkNotNull(factory, "factory"), null);
1✔
292
    }
293
  }
294

295
  public static ProtocolNegotiator.ServerFactory fixedServerFactory(
296
      ProtocolNegotiator negotiator) {
297
    return new FixedProtocolNegotiatorServerFactory(negotiator);
×
298
  }
299

300
  private static final class FixedProtocolNegotiatorServerFactory
301
      implements ProtocolNegotiator.ServerFactory {
302
    private final ProtocolNegotiator protocolNegotiator;
303

304
    public FixedProtocolNegotiatorServerFactory(ProtocolNegotiator protocolNegotiator) {
×
305
      this.protocolNegotiator =
×
306
          Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
×
307
    }
×
308

309
    @Override
310
    public ProtocolNegotiator newNegotiator(ObjectPool<? extends Executor> offloadExecutorPool) {
311
      return protocolNegotiator;
×
312
    }
313
  }
314

315
  /**
316
   * Create a server plaintext handler for gRPC.
317
   */
318
  public static ProtocolNegotiator serverPlaintext() {
319
    return new PlaintextProtocolNegotiator();
1✔
320
  }
321

322
  /**
323
   * Create a server plaintext handler factory for gRPC.
324
   */
325
  public static ProtocolNegotiator.ServerFactory serverPlaintextFactory() {
326
    return new PlaintextProtocolNegotiatorServerFactory();
1✔
327
  }
328

329
  @VisibleForTesting
330
  static final class PlaintextProtocolNegotiatorServerFactory
1✔
331
      implements ProtocolNegotiator.ServerFactory {
332
    @Override
333
    public ProtocolNegotiator newNegotiator(ObjectPool<? extends Executor> offloadExecutorPool) {
334
      return serverPlaintext();
1✔
335
    }
336
  }
337

338
  public static ProtocolNegotiator.ServerFactory serverTlsFactory(SslContext sslContext) {
339
    return new TlsProtocolNegotiatorServerFactory(sslContext);
1✔
340
  }
341

342
  @VisibleForTesting
343
  static final class TlsProtocolNegotiatorServerFactory
344
      implements ProtocolNegotiator.ServerFactory {
345
    private final SslContext sslContext;
346

347
    public TlsProtocolNegotiatorServerFactory(SslContext sslContext) {
1✔
348
      this.sslContext = Preconditions.checkNotNull(sslContext, "sslContext");
1✔
349
    }
1✔
350

351
    @Override
352
    public ProtocolNegotiator newNegotiator(ObjectPool<? extends Executor> offloadExecutorPool) {
353
      return serverTls(sslContext, offloadExecutorPool);
1✔
354
    }
355
  }
356

357
  /**
358
   * Create a server TLS handler for HTTP/2 capable of using ALPN/NPN.
359
   * @param executorPool a dedicated {@link Executor} pool for time-consuming TLS tasks
360
   */
361
  public static ProtocolNegotiator serverTls(final SslContext sslContext,
362
      final ObjectPool<? extends Executor> executorPool) {
363
    Preconditions.checkNotNull(sslContext, "sslContext");
1✔
364
    final Executor executor;
365
    if (executorPool != null) {
1✔
366
      // The handlers here can out-live the {@link ProtocolNegotiator}.
367
      // To keep their own reference to executor from executorPool, we use an extra (unused)
368
      // reference here forces the executor to stay alive, which prevents it from being re-created
369
      // for every connection.
370
      executor = executorPool.getObject();
1✔
371
    } else {
372
      executor = null;
1✔
373
    }
374
    return new ProtocolNegotiator() {
1✔
375
      @Override
376
      public ChannelHandler newHandler(GrpcHttp2ConnectionHandler handler) {
377
        ChannelHandler gnh = new GrpcNegotiationHandler(handler);
1✔
378
        ChannelHandler sth = new ServerTlsHandler(gnh, sslContext, executorPool);
1✔
379
        return new WaitUntilActiveHandler(sth, handler.getNegotiationLogger());
1✔
380
      }
381

382
      @Override
383
      public void close() {
384
        if (executorPool != null && executor != null) {
1✔
385
          executorPool.returnObject(executor);
1✔
386
        }
387
      }
1✔
388

389
      @Override
390
      public AsciiString scheme() {
391
        return Utils.HTTPS;
×
392
      }
393
    };
394
  }
395

396
  /**
397
   * Create a server TLS handler for HTTP/2 capable of using ALPN/NPN.
398
   */
399
  public static ProtocolNegotiator serverTls(final SslContext sslContext) {
400
    return serverTls(sslContext, null);
1✔
401
  }
402

403
  static final class ServerTlsHandler extends ChannelInboundHandlerAdapter {
404
    private Executor executor;
405
    private final ChannelHandler next;
406
    private final SslContext sslContext;
407

408
    private ProtocolNegotiationEvent pne = ProtocolNegotiationEvent.DEFAULT;
1✔
409

410
    ServerTlsHandler(ChannelHandler next,
411
        SslContext sslContext,
412
        final ObjectPool<? extends Executor> executorPool) {
1✔
413
      this.sslContext = checkNotNull(sslContext, "sslContext");
1✔
414
      this.next = checkNotNull(next, "next");
1✔
415
      if (executorPool != null) {
1✔
416
        this.executor = executorPool.getObject();
1✔
417
      }
418
    }
1✔
419

420
    @Override
421
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
422
      super.handlerAdded(ctx);
1✔
423
      SSLEngine sslEngine = sslContext.newEngine(ctx.alloc());
1✔
424
      ctx.pipeline().addBefore(ctx.name(), /* name= */ null, this.executor != null
1✔
425
          ? new SslHandler(sslEngine, false, this.executor)
1✔
426
          : new SslHandler(sslEngine, false));
1✔
427
    }
1✔
428

429
    @Override
430
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
431
      if (evt instanceof ProtocolNegotiationEvent) {
1✔
432
        pne = (ProtocolNegotiationEvent) evt;
1✔
433
      } else if (evt instanceof SslHandshakeCompletionEvent) {
1✔
434
        SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
1✔
435
        if (!handshakeEvent.isSuccess()) {
1✔
436
          logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed for new client.", null);
1✔
437
          ctx.fireExceptionCaught(handshakeEvent.cause());
1✔
438
          return;
1✔
439
        }
440
        SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
1✔
441
        if (!sslContext.applicationProtocolNegotiator().protocols().contains(
1✔
442
                sslHandler.applicationProtocol())) {
1✔
443
          logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed for new client.", null);
1✔
444
          ctx.fireExceptionCaught(unavailableException(
1✔
445
              "Failed protocol negotiation: Unable to find compatible protocol"));
446
          return;
1✔
447
        }
448
        ctx.pipeline().replace(ctx.name(), null, next);
1✔
449
        fireProtocolNegotiationEvent(ctx, sslHandler.engine().getSession());
1✔
450
      } else {
1✔
451
        super.userEventTriggered(ctx, evt);
1✔
452
      }
453
    }
1✔
454

455
    private void fireProtocolNegotiationEvent(ChannelHandlerContext ctx, SSLSession session) {
456
      Security security = new Security(new Tls(session));
1✔
457
      Attributes attrs = pne.getAttributes().toBuilder()
1✔
458
          .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
459
          .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
1✔
460
          .build();
1✔
461
      ctx.fireUserEventTriggered(pne.withAttributes(attrs).withSecurity(security));
1✔
462
    }
1✔
463
  }
464

465
  /**
466
   * Returns a {@link ProtocolNegotiator} that does HTTP CONNECT proxy negotiation.
467
   */
468
  public static ProtocolNegotiator httpProxy(final SocketAddress proxyAddress,
469
      final @Nullable String proxyUsername, final @Nullable String proxyPassword,
470
      final ProtocolNegotiator negotiator) {
471
    checkNotNull(negotiator, "negotiator");
1✔
472
    checkNotNull(proxyAddress, "proxyAddress");
1✔
473
    final AsciiString scheme = negotiator.scheme();
1✔
474
    class ProxyNegotiator implements ProtocolNegotiator {
1✔
475
      @Override
476
      public ChannelHandler newHandler(GrpcHttp2ConnectionHandler http2Handler) {
477
        ChannelHandler protocolNegotiationHandler = negotiator.newHandler(http2Handler);
1✔
478
        ChannelLogger negotiationLogger = http2Handler.getNegotiationLogger();
1✔
479
        return new ProxyProtocolNegotiationHandler(
1✔
480
            proxyAddress, proxyUsername, proxyPassword, protocolNegotiationHandler,
481
            negotiationLogger);
482
      }
483

484
      @Override
485
      public AsciiString scheme() {
486
        return scheme;
×
487
      }
488

489
      // This method is not normally called, because we use httpProxy on a per-connection basis in
490
      // NettyChannelBuilder. Instead, we expect `negotiator' to be closed by NettyTransportFactory.
491
      @Override
492
      public void close() {
493
        negotiator.close();
×
494
      }
×
495
    }
496

497
    return new ProxyNegotiator();
1✔
498
  }
499

500
  /**
501
   * A Proxy handler follows {@link ProtocolNegotiationHandler} pattern. Upon successful proxy
502
   * connection, this handler will install {@code next} handler which should be a handler from
503
   * other type of {@link ProtocolNegotiator} to continue negotiating protocol using proxy.
504
   */
505
  static final class ProxyProtocolNegotiationHandler extends ProtocolNegotiationHandler {
506

507
    private final SocketAddress address;
508
    @Nullable private final String userName;
509
    @Nullable private final String password;
510

511
    public ProxyProtocolNegotiationHandler(
512
        SocketAddress address,
513
        @Nullable String userName,
514
        @Nullable String password,
515
        ChannelHandler next,
516
        ChannelLogger negotiationLogger) {
517
      super(next, negotiationLogger);
1✔
518
      this.address = checkNotNull(address, "address");
1✔
519
      this.userName = userName;
1✔
520
      this.password = password;
1✔
521
    }
1✔
522

523
    @Override
524
    protected void protocolNegotiationEventTriggered(ChannelHandlerContext ctx) {
525
      HttpProxyHandler nettyProxyHandler;
526
      if (userName == null || password == null) {
1✔
527
        nettyProxyHandler = new HttpProxyHandler(address);
1✔
528
      } else {
529
        nettyProxyHandler = new HttpProxyHandler(address, userName, password);
×
530
      }
531
      ctx.pipeline().addBefore(ctx.name(), /* name= */ null, nettyProxyHandler);
1✔
532
    }
1✔
533

534
    @Override
535
    protected void userEventTriggered0(ChannelHandlerContext ctx, Object evt) throws Exception {
536
      if (evt instanceof ProxyConnectionEvent) {
1✔
537
        fireProtocolNegotiationEvent(ctx);
1✔
538
      } else {
539
        super.userEventTriggered(ctx, evt);
×
540
      }
541
    }
1✔
542
  }
543

544
  static final class ClientTlsProtocolNegotiator implements ProtocolNegotiator {
545

546
    public ClientTlsProtocolNegotiator(SslContext sslContext,
547
        ObjectPool<? extends Executor> executorPool, Optional<Runnable> handshakeCompleteRunnable) {
1✔
548
      this.sslContext = checkNotNull(sslContext, "sslContext");
1✔
549
      this.executorPool = executorPool;
1✔
550
      if (this.executorPool != null) {
1✔
551
        this.executor = this.executorPool.getObject();
1✔
552
      }
553
      this.handshakeCompleteRunnable = handshakeCompleteRunnable;
1✔
554
    }
1✔
555

556
    private final SslContext sslContext;
557
    private final ObjectPool<? extends Executor> executorPool;
558
    private final Optional<Runnable> handshakeCompleteRunnable;
559
    private Executor executor;
560

561
    @Override
562
    public AsciiString scheme() {
563
      return Utils.HTTPS;
1✔
564
    }
565

566
    @Override
567
    public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
568
      ChannelHandler gnh = new GrpcNegotiationHandler(grpcHandler);
1✔
569
      ChannelLogger negotiationLogger = grpcHandler.getNegotiationLogger();
1✔
570
      ChannelHandler cth = new ClientTlsHandler(gnh, sslContext, grpcHandler.getAuthority(),
1✔
571
          this.executor, negotiationLogger, handshakeCompleteRunnable);
572
      return new WaitUntilActiveHandler(cth, negotiationLogger);
1✔
573
    }
574

575
    @Override
576
    public void close() {
577
      if (this.executorPool != null && this.executor != null) {
1✔
578
        this.executorPool.returnObject(this.executor);
1✔
579
      }
580
    }
1✔
581
  }
582

583
  static final class ClientTlsHandler extends ProtocolNegotiationHandler {
584

585
    private final SslContext sslContext;
586
    private final String host;
587
    private final int port;
588
    private Executor executor;
589
    private final Optional<Runnable> handshakeCompleteRunnable;
590

591
    ClientTlsHandler(ChannelHandler next, SslContext sslContext, String authority,
592
        Executor executor, ChannelLogger negotiationLogger,
593
        Optional<Runnable> handshakeCompleteRunnable) {
594
      super(next, negotiationLogger);
1✔
595
      this.sslContext = checkNotNull(sslContext, "sslContext");
1✔
596
      HostPort hostPort = parseAuthority(authority);
1✔
597
      this.host = hostPort.host;
1✔
598
      this.port = hostPort.port;
1✔
599
      this.executor = executor;
1✔
600
      this.handshakeCompleteRunnable = handshakeCompleteRunnable;
1✔
601
    }
1✔
602

603
    @Override
604
    protected void handlerAdded0(ChannelHandlerContext ctx) {
605
      SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(), host, port);
1✔
606
      SSLParameters sslParams = sslEngine.getSSLParameters();
1✔
607
      sslParams.setEndpointIdentificationAlgorithm("HTTPS");
1✔
608
      sslEngine.setSSLParameters(sslParams);
1✔
609
      ctx.pipeline().addBefore(ctx.name(), /* name= */ null, this.executor != null
1✔
610
          ? new SslHandler(sslEngine, false, this.executor)
1✔
611
          : new SslHandler(sslEngine, false));
1✔
612
    }
1✔
613

614
    @Override
615
    protected void userEventTriggered0(ChannelHandlerContext ctx, Object evt) throws Exception {
616
      if (evt instanceof SslHandshakeCompletionEvent) {
1✔
617
        SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
1✔
618
        if (handshakeEvent.isSuccess()) {
1✔
619
          SslHandler handler = ctx.pipeline().get(SslHandler.class);
1✔
620
          if (sslContext.applicationProtocolNegotiator().protocols()
1✔
621
              .contains(handler.applicationProtocol())) {
1✔
622
            // Successfully negotiated the protocol.
623
            logSslEngineDetails(Level.FINER, ctx, "TLS negotiation succeeded.", null);
1✔
624
            propagateTlsComplete(ctx, handler.engine().getSession());
1✔
625
          } else {
626
            Exception ex =
1✔
627
                unavailableException("Failed ALPN negotiation: Unable to find compatible protocol");
1✔
628
            logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed.", ex);
1✔
629
            if (handshakeCompleteRunnable.isPresent()) {
1✔
630
              handshakeCompleteRunnable.get().run();
×
631
            }
632
            ctx.fireExceptionCaught(ex);
1✔
633
          }
634
        } else {
1✔
635
          Throwable t = handshakeEvent.cause();
1✔
636
          if (t instanceof ClosedChannelException) {
1✔
637
            // On channelInactive(), SslHandler creates its own ClosedChannelException and
638
            // propagates it before the actual channelInactive(). So we assume here that any
639
            // such exception is from channelInactive() and emulate the normal behavior of
640
            // WriteBufferingAndExceptionHandler
641
            t = Status.UNAVAILABLE
1✔
642
                .withDescription("Connection closed while performing TLS negotiation")
1✔
643
                .withCause(t)
1✔
644
                .asRuntimeException();
1✔
645
          }
646
          if (handshakeCompleteRunnable.isPresent()) {
1✔
647
            handshakeCompleteRunnable.get().run();
×
648
          }
649
          ctx.fireExceptionCaught(t);
1✔
650
        }
651
      } else {
1✔
652
        super.userEventTriggered0(ctx, evt);
1✔
653
      }
654
    }
1✔
655

656
    private void propagateTlsComplete(ChannelHandlerContext ctx, SSLSession session) {
657
      Security security = new Security(new Tls(session));
1✔
658
      ProtocolNegotiationEvent existingPne = getProtocolNegotiationEvent();
1✔
659
      Attributes attrs = existingPne.getAttributes().toBuilder()
1✔
660
          .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
661
          .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
1✔
662
          .build();
1✔
663
      replaceProtocolNegotiationEvent(existingPne.withAttributes(attrs).withSecurity(security));
1✔
664
      if (handshakeCompleteRunnable.isPresent()) {
1✔
665
        handshakeCompleteRunnable.get().run();
×
666
      }
667
      fireProtocolNegotiationEvent(ctx);
1✔
668
    }
1✔
669
  }
670

671
  @VisibleForTesting
672
  static HostPort parseAuthority(String authority) {
673
    URI uri = GrpcUtil.authorityToUri(Preconditions.checkNotNull(authority, "authority"));
1✔
674
    String host;
675
    int port;
676
    if (uri.getHost() != null) {
1✔
677
      host = uri.getHost();
1✔
678
      port = uri.getPort();
1✔
679
    } else {
680
      /*
681
       * Implementation note: We pick -1 as the port here rather than deriving it from the
682
       * original socket address.  The SSL engine doesn't use this port number when contacting the
683
       * remote server, but rather it is used for other things like SSL Session caching.  When an
684
       * invalid authority is provided (like "bad_cert"), picking the original port and passing it
685
       * in would mean that the port might used under the assumption that it was correct.   By
686
       * using -1 here, it forces the SSL implementation to treat it as invalid.
687
       */
688
      host = authority;
1✔
689
      port = -1;
1✔
690
    }
691
    return new HostPort(host, port);
1✔
692
  }
693

694
  /**
695
   * Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
696
   * be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
697
   * may happen immediately, even before the TLS Handshake is complete.
698
   * @param executorPool a dedicated {@link Executor} pool for time-consuming TLS tasks
699
   */
700
  public static ProtocolNegotiator tls(SslContext sslContext,
701
      ObjectPool<? extends Executor> executorPool, Optional<Runnable> handshakeCompleteRunnable) {
702
    return new ClientTlsProtocolNegotiator(sslContext, executorPool, handshakeCompleteRunnable);
1✔
703
  }
704

705
  /**
706
   * Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
707
   * be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
708
   * may happen immediately, even before the TLS Handshake is complete.
709
   */
710
  public static ProtocolNegotiator tls(SslContext sslContext) {
711
    return tls(sslContext, null, Optional.empty());
1✔
712
  }
713

714
  public static ProtocolNegotiator.ClientFactory tlsClientFactory(SslContext sslContext) {
715
    return new TlsProtocolNegotiatorClientFactory(sslContext);
1✔
716
  }
717

718
  @VisibleForTesting
719
  static final class TlsProtocolNegotiatorClientFactory
720
      implements ProtocolNegotiator.ClientFactory {
721
    private final SslContext sslContext;
722

723
    public TlsProtocolNegotiatorClientFactory(SslContext sslContext) {
1✔
724
      this.sslContext = Preconditions.checkNotNull(sslContext, "sslContext");
1✔
725
    }
1✔
726

727
    @Override public ProtocolNegotiator newNegotiator() {
728
      return tls(sslContext);
1✔
729
    }
730

731
    @Override public int getDefaultPort() {
732
      return GrpcUtil.DEFAULT_PORT_SSL;
1✔
733
    }
734
  }
735

736
  /** A tuple of (host, port). */
737
  @VisibleForTesting
738
  static final class HostPort {
739
    final String host;
740
    final int port;
741

742
    public HostPort(String host, int port) {
1✔
743
      this.host = host;
1✔
744
      this.port = port;
1✔
745
    }
1✔
746
  }
747

748
  /**
749
   * Returns a {@link ProtocolNegotiator} used for upgrading to HTTP/2 from HTTP/1.x.
750
   */
751
  public static ProtocolNegotiator plaintextUpgrade() {
752
    return new PlaintextUpgradeProtocolNegotiator();
1✔
753
  }
754

755
  public static ProtocolNegotiator.ClientFactory plaintextUpgradeClientFactory() {
756
    return new PlaintextUpgradeProtocolNegotiatorClientFactory();
×
757
  }
758

759
  private static final class PlaintextUpgradeProtocolNegotiatorClientFactory
760
      implements ProtocolNegotiator.ClientFactory {
761
    @Override public ProtocolNegotiator newNegotiator() {
762
      return plaintextUpgrade();
×
763
    }
764

765
    @Override public int getDefaultPort() {
766
      return GrpcUtil.DEFAULT_PORT_PLAINTEXT;
×
767
    }
768
  }
769

770
  static final class PlaintextUpgradeProtocolNegotiator implements ProtocolNegotiator {
1✔
771

772
    @Override
773
    public AsciiString scheme() {
774
      return Utils.HTTP;
×
775
    }
776

777
    @Override
778
    public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
779
      ChannelHandler upgradeHandler =
1✔
780
          new Http2UpgradeAndGrpcHandler(grpcHandler.getAuthority(), grpcHandler);
1✔
781
      return new WaitUntilActiveHandler(upgradeHandler, grpcHandler.getNegotiationLogger());
1✔
782
    }
783

784
    @Override
785
    public void close() {}
1✔
786
  }
787

788
  /**
789
   * Acts as a combination of Http2Upgrade and {@link GrpcNegotiationHandler}.  Unfortunately,
790
   * this negotiator doesn't follow the pattern of "just one handler doing negotiation at a time."
791
   * This is due to the tight coupling between the upgrade handler and the HTTP/2 handler.
792
   */
793
  static final class Http2UpgradeAndGrpcHandler extends ChannelInboundHandlerAdapter {
794

795
    private final String authority;
796
    private final GrpcHttp2ConnectionHandler next;
797
    private final ChannelLogger negotiationLogger;
798

799
    private ProtocolNegotiationEvent pne;
800

801
    Http2UpgradeAndGrpcHandler(String authority, GrpcHttp2ConnectionHandler next) {
1✔
802
      this.authority = checkNotNull(authority, "authority");
1✔
803
      this.next = checkNotNull(next, "next");
1✔
804
      this.negotiationLogger = next.getNegotiationLogger();
1✔
805
    }
1✔
806

807
    @Override
808
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
809
      negotiationLogger.log(ChannelLogLevel.INFO, "Http2Upgrade started");
1✔
810
      HttpClientCodec httpClientCodec = new HttpClientCodec();
1✔
811
      ctx.pipeline().addBefore(ctx.name(), null, httpClientCodec);
1✔
812

813
      Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(next);
1✔
814
      HttpClientUpgradeHandler upgrader =
1✔
815
          new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, /*maxContentLength=*/ 1000);
816
      ctx.pipeline().addBefore(ctx.name(), null, upgrader);
1✔
817

818
      // Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request
819
      // which causes the upgrade headers to be added
820
      DefaultHttpRequest upgradeTrigger =
1✔
821
          new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
822
      upgradeTrigger.headers().add(HttpHeaderNames.HOST, authority);
1✔
823
      ctx.writeAndFlush(upgradeTrigger).addListener(
1✔
824
          ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
825
      super.handlerAdded(ctx);
1✔
826
    }
1✔
827

828
    @Override
829
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
830
      if (evt instanceof ProtocolNegotiationEvent) {
1✔
831
        checkState(pne == null, "negotiation already started");
1✔
832
        pne = (ProtocolNegotiationEvent) evt;
1✔
833
      } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
1✔
834
        checkState(pne != null, "negotiation not yet complete");
1✔
835
        negotiationLogger.log(ChannelLogLevel.INFO, "Http2Upgrade finished");
1✔
836
        ctx.pipeline().remove(ctx.name());
1✔
837
        next.handleProtocolNegotiationCompleted(pne.getAttributes(), pne.getSecurity());
1✔
838
      } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
1✔
839
        ctx.fireExceptionCaught(unavailableException("HTTP/2 upgrade rejected"));
×
840
      } else {
841
        super.userEventTriggered(ctx, evt);
1✔
842
      }
843
    }
1✔
844
  }
845

846
  /**
847
   * Returns a {@link ChannelHandler} that ensures that the {@code handler} is added to the
848
   * pipeline writes to the {@link io.netty.channel.Channel} may happen immediately, even before it
849
   * is active.
850
   */
851
  public static ProtocolNegotiator plaintext() {
852
    return new PlaintextProtocolNegotiator();
1✔
853
  }
854

855
  public static ProtocolNegotiator.ClientFactory plaintextClientFactory() {
856
    return new PlaintextProtocolNegotiatorClientFactory();
1✔
857
  }
858

859
  @VisibleForTesting
860
  static final class PlaintextProtocolNegotiatorClientFactory
1✔
861
      implements ProtocolNegotiator.ClientFactory {
862
    @Override public ProtocolNegotiator newNegotiator() {
863
      return plaintext();
1✔
864
    }
865

866
    @Override public int getDefaultPort() {
867
      return GrpcUtil.DEFAULT_PORT_PLAINTEXT;
1✔
868
    }
869
  }
870

871
  private static RuntimeException unavailableException(String msg) {
872
    return Status.UNAVAILABLE.withDescription(msg).asRuntimeException();
1✔
873
  }
874

875
  @VisibleForTesting
876
  static void logSslEngineDetails(Level level, ChannelHandlerContext ctx, String msg,
877
      @Nullable Throwable t) {
878
    if (!log.isLoggable(level)) {
1✔
879
      return;
1✔
880
    }
881

882
    SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
1✔
883
    SSLEngine engine = sslHandler.engine();
1✔
884

885
    StringBuilder builder = new StringBuilder(msg);
1✔
886
    builder.append("\nSSLEngine Details: [\n");
1✔
887
    if (engine instanceof OpenSslEngine) {
1✔
888
      builder.append("    OpenSSL, ");
1✔
889
      builder.append("Version: 0x").append(Integer.toHexString(OpenSsl.version()));
1✔
890
      builder.append(" (").append(OpenSsl.versionString()).append("), ");
1✔
891
      builder.append("ALPN supported: ").append(SslProvider.isAlpnSupported(SslProvider.OPENSSL));
1✔
892
    } else if (JettyTlsUtil.isJettyAlpnConfigured()) {
×
893
      builder.append("    Jetty ALPN");
×
894
    } else if (JettyTlsUtil.isJettyNpnConfigured()) {
×
895
      builder.append("    Jetty NPN");
×
896
    } else if (JettyTlsUtil.isJava9AlpnAvailable()) {
×
897
      builder.append("    JDK9 ALPN");
×
898
    }
899
    builder.append("\n    TLS Protocol: ");
1✔
900
    builder.append(engine.getSession().getProtocol());
1✔
901
    builder.append("\n    Application Protocol: ");
1✔
902
    builder.append(sslHandler.applicationProtocol());
1✔
903
    builder.append("\n    Need Client Auth: " );
1✔
904
    builder.append(engine.getNeedClientAuth());
1✔
905
    builder.append("\n    Want Client Auth: ");
1✔
906
    builder.append(engine.getWantClientAuth());
1✔
907
    builder.append("\n    Supported protocols=");
1✔
908
    builder.append(Arrays.toString(engine.getSupportedProtocols()));
1✔
909
    builder.append("\n    Enabled protocols=");
1✔
910
    builder.append(Arrays.toString(engine.getEnabledProtocols()));
1✔
911
    builder.append("\n    Supported ciphers=");
1✔
912
    builder.append(Arrays.toString(engine.getSupportedCipherSuites()));
1✔
913
    builder.append("\n    Enabled ciphers=");
1✔
914
    builder.append(Arrays.toString(engine.getEnabledCipherSuites()));
1✔
915
    builder.append("\n]");
1✔
916

917
    log.log(level, builder.toString(), t);
1✔
918
  }
1✔
919

920
  /**
921
   * Adapts a {@link ProtocolNegotiationEvent} to the {@link GrpcHttp2ConnectionHandler}.
922
   */
923
  static final class GrpcNegotiationHandler extends ChannelInboundHandlerAdapter {
924
    private final GrpcHttp2ConnectionHandler next;
925

926
    public GrpcNegotiationHandler(GrpcHttp2ConnectionHandler next) {
1✔
927
      this.next = checkNotNull(next, "next");
1✔
928
    }
1✔
929

930
    @Override
931
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
932
      if (evt instanceof ProtocolNegotiationEvent) {
1✔
933
        ProtocolNegotiationEvent protocolNegotiationEvent = (ProtocolNegotiationEvent) evt;
1✔
934
        ctx.pipeline().replace(ctx.name(), null, next);
1✔
935
        next.handleProtocolNegotiationCompleted(
1✔
936
            protocolNegotiationEvent.getAttributes(), protocolNegotiationEvent.getSecurity());
1✔
937
      } else {
1✔
938
        super.userEventTriggered(ctx, evt);
×
939
      }
940
    }
1✔
941
  }
942

943
  /*
944
   * Common {@link ProtocolNegotiator}s used by gRPC.  Protocol negotiation follows a pattern to
945
   * simplify the pipeline.   The pipeline should look like:
946
   *
947
   * 1.  {@link ProtocolNegotiator#newHandler() PN.H}, created.
948
   * 2.  [Tail], {@link WriteBufferingAndExceptionHandler WBAEH}, [Head]
949
   * 3.  [Tail], WBAEH, PN.H, [Head]
950
   *
951
   * <p>Typically, PN.H with be an instance of {@link InitHandler IH}, which is a trivial handler
952
   * that can return the {@code scheme()} of the negotiation.  IH, and each handler after,
953
   * replaces itself with a "next" handler once its part of negotiation is complete.  This keeps
954
   * the pipeline small, and limits the interaction between handlers.
955
   *
956
   * <p>Additionally, each handler may fire a {@link ProtocolNegotiationEvent PNE} just after
957
   * replacing itself.  Handlers should capture user events of type PNE, and re-trigger the events
958
   * once that handler's part of negotiation is complete.  This can be seen in the
959
   * {@link WaitUntilActiveHandler WUAH}, which waits until the channel is active.  Once active, it
960
   * replaces itself with the next handler, and fires a PNE containing the addresses.  Continuing
961
   * with IH and WUAH:
962
   *
963
   * 3.  [Tail], WBAEH, IH, [Head]
964
   * 4.  [Tail], WBAEH, WUAH, [Head]
965
   * 5.  [Tail], WBAEH, {@link GrpcNegotiationHandler}, [Head]
966
   * 6a. [Tail], WBAEH, {@link GrpcHttp2ConnectionHandler GHCH}, [Head]
967
   * 6b. [Tail], GHCH, [Head]
968
   */
969

970
  /**
971
   * A negotiator that only does plain text.
972
   */
973
  static final class PlaintextProtocolNegotiator implements ProtocolNegotiator {
1✔
974

975
    @Override
976
    public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
977
      ChannelHandler grpcNegotiationHandler = new GrpcNegotiationHandler(grpcHandler);
1✔
978
      ChannelHandler activeHandler = new WaitUntilActiveHandler(grpcNegotiationHandler,
1✔
979
          grpcHandler.getNegotiationLogger());
1✔
980
      return activeHandler;
1✔
981
    }
982

983
    @Override
984
    public void close() {}
1✔
985

986
    @Override
987
    public AsciiString scheme() {
988
      return Utils.HTTP;
1✔
989
    }
990
  }
991

992
  /**
993
   * Waits for the channel to be active, and then installs the next Handler.  Using this allows
994
   * subsequent handlers to assume the channel is active and ready to send.  Additionally, this a
995
   * {@link ProtocolNegotiationEvent}, with the connection addresses.
996
   */
997
  static final class WaitUntilActiveHandler extends ProtocolNegotiationHandler {
998

999
    boolean protocolNegotiationEventReceived;
1000

1001
    WaitUntilActiveHandler(ChannelHandler next, ChannelLogger negotiationLogger) {
1002
      super(next, negotiationLogger);
1✔
1003
    }
1✔
1004

1005
    @Override
1006
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
1007
      if (protocolNegotiationEventReceived) {
1✔
1008
        replaceOnActive(ctx);
1✔
1009
        fireProtocolNegotiationEvent(ctx);
1✔
1010
      }
1011
      // Still propagate channelActive to the new handler.
1012
      super.channelActive(ctx);
1✔
1013
    }
1✔
1014

1015
    @Override
1016
    protected void protocolNegotiationEventTriggered(ChannelHandlerContext ctx) {
1017
      protocolNegotiationEventReceived = true;
1✔
1018
      if (ctx.channel().isActive()) {
1✔
1019
        replaceOnActive(ctx);
1✔
1020
        fireProtocolNegotiationEvent(ctx);
1✔
1021
      }
1022
    }
1✔
1023

1024
    private void replaceOnActive(ChannelHandlerContext ctx) {
1025
      ProtocolNegotiationEvent existingPne = getProtocolNegotiationEvent();
1✔
1026
      Attributes attrs = existingPne.getAttributes().toBuilder()
1✔
1027
          .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
1✔
1028
          .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
1✔
1029
          // Later handlers are expected to overwrite this.
1030
          .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
1✔
1031
          .build();
1✔
1032
      replaceProtocolNegotiationEvent(existingPne.withAttributes(attrs));
1✔
1033
    }
1✔
1034
  }
1035

1036
  /**
1037
   * ProtocolNegotiationHandler is a convenience handler that makes it easy to follow the rules for
1038
   * protocol negotiation.  Handlers should strongly consider extending this handler.
1039
   */
1040
  static class ProtocolNegotiationHandler extends ChannelDuplexHandler {
1041

1042
    private final ChannelHandler next;
1043
    private final String negotiatorName;
1044
    private ProtocolNegotiationEvent pne;
1045
    private final ChannelLogger negotiationLogger;
1046

1047
    protected ProtocolNegotiationHandler(ChannelHandler next, String negotiatorName,
1048
        ChannelLogger negotiationLogger) {
×
1049
      this.next = checkNotNull(next, "next");
×
1050
      this.negotiatorName = negotiatorName;
×
1051
      this.negotiationLogger = checkNotNull(negotiationLogger, "negotiationLogger");
×
1052
    }
×
1053

1054
    protected ProtocolNegotiationHandler(ChannelHandler next, ChannelLogger negotiationLogger) {
1✔
1055
      this.next = checkNotNull(next, "next");
1✔
1056
      this.negotiatorName = getClass().getSimpleName().replace("Handler", "");
1✔
1057
      this.negotiationLogger = checkNotNull(negotiationLogger, "negotiationLogger");
1✔
1058
    }
1✔
1059

1060
    @Override
1061
    public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
1062
      negotiationLogger.log(ChannelLogLevel.DEBUG, "{0} started", negotiatorName);
1✔
1063
      handlerAdded0(ctx);
1✔
1064
    }
1✔
1065

1066
    @ForOverride
1067
    protected void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
1068
      super.handlerAdded(ctx);
1✔
1069
    }
1✔
1070

1071
    @Override
1072
    public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1073
      if (evt instanceof ProtocolNegotiationEvent) {
1✔
1074
        checkState(pne == null, "pre-existing negotiation: %s < %s", pne, evt);
1✔
1075
        pne = (ProtocolNegotiationEvent) evt;
1✔
1076
        protocolNegotiationEventTriggered(ctx);
1✔
1077
      } else {
1078
        userEventTriggered0(ctx, evt);
1✔
1079
      }
1080
    }
1✔
1081

1082
    protected void userEventTriggered0(ChannelHandlerContext ctx, Object evt) throws Exception {
1083
      super.userEventTriggered(ctx, evt);
1✔
1084
    }
1✔
1085

1086
    @ForOverride
1087
    protected void protocolNegotiationEventTriggered(ChannelHandlerContext ctx) {
1088
      // no-op
1089
    }
1✔
1090

1091
    protected final ProtocolNegotiationEvent getProtocolNegotiationEvent() {
1092
      checkState(pne != null, "previous protocol negotiation event hasn't triggered");
1✔
1093
      return pne;
1✔
1094
    }
1095

1096
    protected final void replaceProtocolNegotiationEvent(ProtocolNegotiationEvent pne) {
1097
      checkState(this.pne != null, "previous protocol negotiation event hasn't triggered");
1✔
1098
      this.pne = checkNotNull(pne);
1✔
1099
    }
1✔
1100

1101
    protected final void fireProtocolNegotiationEvent(ChannelHandlerContext ctx) {
1102
      checkState(pne != null, "previous protocol negotiation event hasn't triggered");
1✔
1103
      negotiationLogger.log(ChannelLogLevel.INFO, "{0} completed", negotiatorName);
1✔
1104
      ctx.pipeline().replace(ctx.name(), /* newName= */ null, next);
1✔
1105
      ctx.fireUserEventTriggered(pne);
1✔
1106
    }
1✔
1107
  }
1108
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc