• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19180

29 Apr 2024 11:30PM UTC coverage: 88.296% (-0.06%) from 88.351%
#19180

push

github

web-flow
Plumb optional labels from LB to ClientStreamTracer

As part of gRFC A78:

> To support the locality label in the per-call metrics, we will provide
> a mechanism for LB picker to add optional labels to the call attempt
> tracer.

31435 of 35602 relevant lines covered (88.3%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.51
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import io.grpc.Attributes;
36
import io.grpc.CallCredentials;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ChannelCredentials;
40
import io.grpc.ChannelLogger;
41
import io.grpc.ChannelLogger.ChannelLogLevel;
42
import io.grpc.ClientCall;
43
import io.grpc.ClientInterceptor;
44
import io.grpc.ClientInterceptors;
45
import io.grpc.ClientStreamTracer;
46
import io.grpc.ClientTransportFilter;
47
import io.grpc.CompressorRegistry;
48
import io.grpc.ConnectivityState;
49
import io.grpc.ConnectivityStateInfo;
50
import io.grpc.Context;
51
import io.grpc.Deadline;
52
import io.grpc.DecompressorRegistry;
53
import io.grpc.EquivalentAddressGroup;
54
import io.grpc.ForwardingChannelBuilder2;
55
import io.grpc.ForwardingClientCall;
56
import io.grpc.Grpc;
57
import io.grpc.InternalChannelz;
58
import io.grpc.InternalChannelz.ChannelStats;
59
import io.grpc.InternalChannelz.ChannelTrace;
60
import io.grpc.InternalConfigSelector;
61
import io.grpc.InternalInstrumented;
62
import io.grpc.InternalLogId;
63
import io.grpc.InternalWithLogId;
64
import io.grpc.LoadBalancer;
65
import io.grpc.LoadBalancer.CreateSubchannelArgs;
66
import io.grpc.LoadBalancer.PickResult;
67
import io.grpc.LoadBalancer.PickSubchannelArgs;
68
import io.grpc.LoadBalancer.ResolvedAddresses;
69
import io.grpc.LoadBalancer.SubchannelPicker;
70
import io.grpc.LoadBalancer.SubchannelStateListener;
71
import io.grpc.ManagedChannel;
72
import io.grpc.ManagedChannelBuilder;
73
import io.grpc.Metadata;
74
import io.grpc.MethodDescriptor;
75
import io.grpc.NameResolver;
76
import io.grpc.NameResolver.ConfigOrError;
77
import io.grpc.NameResolver.ResolutionResult;
78
import io.grpc.NameResolverProvider;
79
import io.grpc.NameResolverRegistry;
80
import io.grpc.ProxyDetector;
81
import io.grpc.Status;
82
import io.grpc.SynchronizationContext;
83
import io.grpc.SynchronizationContext.ScheduledHandle;
84
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
85
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
86
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
87
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
88
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
89
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
90
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
91
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
92
import io.grpc.internal.RetriableStream.Throttle;
93
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
94
import java.net.SocketAddress;
95
import java.net.URI;
96
import java.net.URISyntaxException;
97
import java.util.ArrayList;
98
import java.util.Collection;
99
import java.util.Collections;
100
import java.util.HashSet;
101
import java.util.LinkedHashSet;
102
import java.util.List;
103
import java.util.Map;
104
import java.util.Set;
105
import java.util.concurrent.Callable;
106
import java.util.concurrent.CountDownLatch;
107
import java.util.concurrent.ExecutionException;
108
import java.util.concurrent.Executor;
109
import java.util.concurrent.Future;
110
import java.util.concurrent.ScheduledExecutorService;
111
import java.util.concurrent.ScheduledFuture;
112
import java.util.concurrent.TimeUnit;
113
import java.util.concurrent.TimeoutException;
114
import java.util.concurrent.atomic.AtomicBoolean;
115
import java.util.concurrent.atomic.AtomicReference;
116
import java.util.logging.Level;
117
import java.util.logging.Logger;
118
import java.util.regex.Pattern;
119
import javax.annotation.Nullable;
120
import javax.annotation.concurrent.GuardedBy;
121
import javax.annotation.concurrent.ThreadSafe;
122

123
/** A communication channel for making outgoing RPCs. */
124
@ThreadSafe
125
final class ManagedChannelImpl extends ManagedChannel implements
126
    InternalInstrumented<ChannelStats> {
127
  @VisibleForTesting
128
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
129

130
  // Matching this pattern means the target string is a URI target or at least intended to be one.
131
  // A URI target must be an absolute hierarchical URI.
132
  // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
133
  @VisibleForTesting
134
  static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
1✔
135

136
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
137

138
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
139

140
  @VisibleForTesting
141
  static final Status SHUTDOWN_NOW_STATUS =
1✔
142
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
143

144
  @VisibleForTesting
145
  static final Status SHUTDOWN_STATUS =
1✔
146
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
147

148
  @VisibleForTesting
149
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
150
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
151

152
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
153
      ManagedChannelServiceConfig.empty();
1✔
154
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
155
      new InternalConfigSelector() {
1✔
156
        @Override
157
        public Result selectConfig(PickSubchannelArgs args) {
158
          throw new IllegalStateException("Resolution is pending");
×
159
        }
160
      };
161
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
162
      new LoadBalancer.PickDetailsConsumer() {};
1✔
163

164
  private final InternalLogId logId;
165
  private final String target;
166
  @Nullable
167
  private final String authorityOverride;
168
  private final NameResolverRegistry nameResolverRegistry;
169
  private final NameResolver.Args nameResolverArgs;
170
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
171
  private final ClientTransportFactory originalTransportFactory;
172
  @Nullable
173
  private final ChannelCredentials originalChannelCreds;
174
  private final ClientTransportFactory transportFactory;
175
  private final ClientTransportFactory oobTransportFactory;
176
  private final RestrictedScheduledExecutor scheduledExecutor;
177
  private final Executor executor;
178
  private final ObjectPool<? extends Executor> executorPool;
179
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
180
  private final ExecutorHolder balancerRpcExecutorHolder;
181
  private final ExecutorHolder offloadExecutorHolder;
182
  private final TimeProvider timeProvider;
183
  private final int maxTraceEvents;
184

185
  @VisibleForTesting
1✔
186
  final SynchronizationContext syncContext = new SynchronizationContext(
187
      new Thread.UncaughtExceptionHandler() {
1✔
188
        @Override
189
        public void uncaughtException(Thread t, Throwable e) {
190
          logger.log(
1✔
191
              Level.SEVERE,
192
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
193
              e);
194
          panic(e);
1✔
195
        }
1✔
196
      });
197

198
  private boolean fullStreamDecompression;
199

200
  private final DecompressorRegistry decompressorRegistry;
201
  private final CompressorRegistry compressorRegistry;
202

203
  private final Supplier<Stopwatch> stopwatchSupplier;
204
  /** The timout before entering idle mode. */
205
  private final long idleTimeoutMillis;
206

207
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
208
  private final BackoffPolicy.Provider backoffPolicyProvider;
209

210
  /**
211
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
212
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
213
   * {@link RealChannel}.
214
   */
215
  private final Channel interceptorChannel;
216

217
  private final List<ClientTransportFilter> transportFilters;
218
  @Nullable private final String userAgent;
219

220
  // Only null after channel is terminated. Must be assigned from the syncContext.
221
  private NameResolver nameResolver;
222

223
  // Must be accessed from the syncContext.
224
  private boolean nameResolverStarted;
225

226
  // null when channel is in idle mode.  Must be assigned from syncContext.
227
  @Nullable
228
  private LbHelperImpl lbHelper;
229

230
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
231
  // null if channel is in idle mode.
232
  @Nullable
233
  private volatile SubchannelPicker subchannelPicker;
234

235
  // Must be accessed from the syncContext
236
  private boolean panicMode;
237

238
  // Must be mutated from syncContext
239
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
240
  // switch to a ConcurrentHashMap.
241
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
242

243
  // Must be accessed from syncContext
244
  @Nullable
245
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
246
  private final Object pendingCallsInUseObject = new Object();
1✔
247

248
  // Must be mutated from syncContext
249
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
250

251
  // reprocess() must be run from syncContext
252
  private final DelayedClientTransport delayedTransport;
253
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
254
      = new UncommittedRetriableStreamsRegistry();
255

256
  // Shutdown states.
257
  //
258
  // Channel's shutdown process:
259
  // 1. shutdown(): stop accepting new calls from applications
260
  //   1a shutdown <- true
261
  //   1b subchannelPicker <- null
262
  //   1c delayedTransport.shutdown()
263
  // 2. delayedTransport terminated: stop stream-creation functionality
264
  //   2a terminating <- true
265
  //   2b loadBalancer.shutdown()
266
  //     * LoadBalancer will shutdown subchannels and OOB channels
267
  //   2c loadBalancer <- null
268
  //   2d nameResolver.shutdown()
269
  //   2e nameResolver <- null
270
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
271

272
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
273
  // Must only be mutated and read from syncContext
274
  private boolean shutdownNowed;
275
  // Must only be mutated from syncContext
276
  private boolean terminating;
277
  // Must be mutated from syncContext
278
  private volatile boolean terminated;
279
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
280

281
  private final CallTracer.Factory callTracerFactory;
282
  private final CallTracer channelCallTracer;
283
  private final ChannelTracer channelTracer;
284
  private final ChannelLogger channelLogger;
285
  private final InternalChannelz channelz;
286
  private final RealChannel realChannel;
287
  // Must be mutated and read from syncContext
288
  // a flag for doing channel tracing when flipped
289
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
290
  // Must be mutated and read from constructor or syncContext
291
  // used for channel tracing when value changed
292
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
293

294
  @Nullable
295
  private final ManagedChannelServiceConfig defaultServiceConfig;
296
  // Must be mutated and read from constructor or syncContext
297
  private boolean serviceConfigUpdated = false;
1✔
298
  private final boolean lookUpServiceConfig;
299

300
  // One instance per channel.
301
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
302

303
  private final long perRpcBufferLimit;
304
  private final long channelBufferLimit;
305

306
  // Temporary false flag that can skip the retry code path.
307
  private final boolean retryEnabled;
308

309
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
310

311
  // Called from syncContext
312
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
313
      new DelayedTransportListener();
314

315
  // Must be called from syncContext
316
  private void maybeShutdownNowSubchannels() {
317
    if (shutdownNowed) {
1✔
318
      for (InternalSubchannel subchannel : subchannels) {
1✔
319
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
320
      }
1✔
321
      for (OobChannel oobChannel : oobChannels) {
1✔
322
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
323
      }
1✔
324
    }
325
  }
1✔
326

327
  // Must be accessed from syncContext
328
  @VisibleForTesting
1✔
329
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
330

331
  @Override
332
  public ListenableFuture<ChannelStats> getStats() {
333
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
334
    final class StatsFetcher implements Runnable {
1✔
335
      @Override
336
      public void run() {
337
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
338
        channelCallTracer.updateBuilder(builder);
1✔
339
        channelTracer.updateBuilder(builder);
1✔
340
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
341
        List<InternalWithLogId> children = new ArrayList<>();
1✔
342
        children.addAll(subchannels);
1✔
343
        children.addAll(oobChannels);
1✔
344
        builder.setSubchannels(children);
1✔
345
        ret.set(builder.build());
1✔
346
      }
1✔
347
    }
348

349
    // subchannels and oobchannels can only be accessed from syncContext
350
    syncContext.execute(new StatsFetcher());
1✔
351
    return ret;
1✔
352
  }
353

354
  @Override
355
  public InternalLogId getLogId() {
356
    return logId;
1✔
357
  }
358

359
  // Run from syncContext
360
  private class IdleModeTimer implements Runnable {
1✔
361

362
    @Override
363
    public void run() {
364
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
365
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
366
      // subtle to change rapidly to resolve the channel panic. See #8714
367
      if (lbHelper == null) {
1✔
368
        return;
×
369
      }
370
      enterIdleMode();
1✔
371
    }
1✔
372
  }
373

374
  // Must be called from syncContext
375
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
376
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
377
    if (channelIsActive) {
1✔
378
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
379
      checkState(lbHelper != null, "lbHelper is null");
1✔
380
    }
381
    if (nameResolver != null) {
1✔
382
      nameResolver.shutdown();
1✔
383
      nameResolverStarted = false;
1✔
384
      if (channelIsActive) {
1✔
385
        nameResolver = getNameResolver(
1✔
386
            target, authorityOverride, nameResolverRegistry, nameResolverArgs,
387
            transportFactory.getSupportedSocketAddressTypes());
1✔
388
      } else {
389
        nameResolver = null;
1✔
390
      }
391
    }
392
    if (lbHelper != null) {
1✔
393
      lbHelper.lb.shutdown();
1✔
394
      lbHelper = null;
1✔
395
    }
396
    subchannelPicker = null;
1✔
397
  }
1✔
398

399
  /**
400
   * Make the channel exit idle mode, if it's in it.
401
   *
402
   * <p>Must be called from syncContext
403
   */
404
  @VisibleForTesting
405
  void exitIdleMode() {
406
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
407
    if (shutdown.get() || panicMode) {
1✔
408
      return;
1✔
409
    }
410
    if (inUseStateAggregator.isInUse()) {
1✔
411
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
412
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
413
      cancelIdleTimer(false);
1✔
414
    } else {
415
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
416
      // isInUse() == false, in which case we still need to schedule the timer.
417
      rescheduleIdleTimer();
1✔
418
    }
419
    if (lbHelper != null) {
1✔
420
      return;
1✔
421
    }
422
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
423
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
424
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
425
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
426
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
427
    this.lbHelper = lbHelper;
1✔
428

429
    channelStateManager.gotoState(CONNECTING);
1✔
430
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
431
    nameResolver.start(listener);
1✔
432
    nameResolverStarted = true;
1✔
433
  }
1✔
434

435
  // Must be run from syncContext
436
  private void enterIdleMode() {
437
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
438
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
439
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
440
    // which are bugs.
441
    shutdownNameResolverAndLoadBalancer(true);
1✔
442
    delayedTransport.reprocess(null);
1✔
443
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
444
    channelStateManager.gotoState(IDLE);
1✔
445
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
446
    // transport to be holding some we need to exit idle mode to give these calls a chance to
447
    // be processed.
448
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
449
      exitIdleMode();
1✔
450
    }
451
  }
1✔
452

453
  // Must be run from syncContext
454
  private void cancelIdleTimer(boolean permanent) {
455
    idleTimer.cancel(permanent);
1✔
456
  }
1✔
457

458
  // Always run from syncContext
459
  private void rescheduleIdleTimer() {
460
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
461
      return;
1✔
462
    }
463
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
464
  }
1✔
465

466
  /**
467
   * Force name resolution refresh to happen immediately. Must be run
468
   * from syncContext.
469
   */
470
  private void refreshNameResolution() {
471
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
472
    if (nameResolverStarted) {
1✔
473
      nameResolver.refresh();
1✔
474
    }
475
  }
1✔
476

477
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
478
    volatile Throttle throttle;
479

480
    private ClientTransport getTransport(PickSubchannelArgs args) {
481
      SubchannelPicker pickerCopy = subchannelPicker;
1✔
482
      if (shutdown.get()) {
1✔
483
        // If channel is shut down, delayedTransport is also shut down which will fail the stream
484
        // properly.
485
        return delayedTransport;
1✔
486
      }
487
      if (pickerCopy == null) {
1✔
488
        final class ExitIdleModeForTransport implements Runnable {
1✔
489
          @Override
490
          public void run() {
491
            exitIdleMode();
1✔
492
          }
1✔
493
        }
494

495
        syncContext.execute(new ExitIdleModeForTransport());
1✔
496
        return delayedTransport;
1✔
497
      }
498
      // There is no need to reschedule the idle timer here.
499
      //
500
      // pickerCopy != null, which means idle timer has not expired when this method starts.
501
      // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
502
      // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
503
      // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
504
      //
505
      // In most cases the idle timer is scheduled to fire after the transport has created the
506
      // stream, which would have reported in-use state to the channel that would have cancelled
507
      // the idle timer.
508
      PickResult pickResult = pickerCopy.pickSubchannel(args);
1✔
509
      ClientTransport transport = GrpcUtil.getTransportFromPickResult(
1✔
510
          pickResult, args.getCallOptions().isWaitForReady());
1✔
511
      if (transport != null) {
1✔
512
        return transport;
1✔
513
      }
514
      return delayedTransport;
1✔
515
    }
516

517
    @Override
518
    public ClientStream newStream(
519
        final MethodDescriptor<?, ?> method,
520
        final CallOptions callOptions,
521
        final Metadata headers,
522
        final Context context) {
523
      if (!retryEnabled) {
1✔
524
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
525
            callOptions, headers, 0, /* isTransparentRetry= */ false);
526
        ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
1✔
527
            method, headers, callOptions, new PickDetailsConsumerImpl(tracers)));
528
        Context origContext = context.attach();
1✔
529
        try {
530
          return transport.newStream(method, headers, callOptions, tracers);
1✔
531
        } finally {
532
          context.detach(origContext);
1✔
533
        }
534
      } else {
535
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
536
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
537
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
538
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
539
          @SuppressWarnings("unchecked")
540
          RetryStream() {
1✔
541
            super(
1✔
542
                (MethodDescriptor<ReqT, ?>) method,
543
                headers,
544
                channelBufferUsed,
1✔
545
                perRpcBufferLimit,
1✔
546
                channelBufferLimit,
1✔
547
                getCallExecutor(callOptions),
1✔
548
                transportFactory.getScheduledExecutorService(),
1✔
549
                retryPolicy,
550
                hedgingPolicy,
551
                throttle);
552
          }
1✔
553

554
          @Override
555
          Status prestart() {
556
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
557
          }
558

559
          @Override
560
          void postCommit() {
561
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
562
          }
1✔
563

564
          @Override
565
          ClientStream newSubstream(
566
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
567
              boolean isTransparentRetry) {
568
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
569
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
570
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
571
            ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
1✔
572
                method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers)));
573
            Context origContext = context.attach();
1✔
574
            try {
575
              return transport.newStream(method, newHeaders, newOptions, tracers);
1✔
576
            } finally {
577
              context.detach(origContext);
1✔
578
            }
579
          }
580
        }
581

582
        return new RetryStream<>();
1✔
583
      }
584
    }
585
  }
586

587
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
588

589
  private final Rescheduler idleTimer;
590

591
  ManagedChannelImpl(
592
      ManagedChannelImplBuilder builder,
593
      ClientTransportFactory clientTransportFactory,
594
      BackoffPolicy.Provider backoffPolicyProvider,
595
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
596
      Supplier<Stopwatch> stopwatchSupplier,
597
      List<ClientInterceptor> interceptors,
598
      final TimeProvider timeProvider) {
1✔
599
    this.target = checkNotNull(builder.target, "target");
1✔
600
    this.logId = InternalLogId.allocate("Channel", target);
1✔
601
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
602
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
603
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
604
    this.originalChannelCreds = builder.channelCredentials;
1✔
605
    this.originalTransportFactory = clientTransportFactory;
1✔
606
    this.offloadExecutorHolder =
1✔
607
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
608
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
609
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
610
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
611
        clientTransportFactory, null, this.offloadExecutorHolder);
612
    this.scheduledExecutor =
1✔
613
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
614
    maxTraceEvents = builder.maxTraceEvents;
1✔
615
    channelTracer = new ChannelTracer(
1✔
616
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
617
        "Channel for '" + target + "'");
618
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
619
    ProxyDetector proxyDetector =
620
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
621
    this.retryEnabled = builder.retryEnabled;
1✔
622
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
623
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
624
    ScParser serviceConfigParser =
1✔
625
        new ScParser(
626
            retryEnabled,
627
            builder.maxRetryAttempts,
628
            builder.maxHedgedAttempts,
629
            loadBalancerFactory);
630
    this.authorityOverride = builder.authorityOverride;
1✔
631
    this.nameResolverArgs =
1✔
632
        NameResolver.Args.newBuilder()
1✔
633
            .setDefaultPort(builder.getDefaultPort())
1✔
634
            .setProxyDetector(proxyDetector)
1✔
635
            .setSynchronizationContext(syncContext)
1✔
636
            .setScheduledExecutorService(scheduledExecutor)
1✔
637
            .setServiceConfigParser(serviceConfigParser)
1✔
638
            .setChannelLogger(channelLogger)
1✔
639
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
640
            .setOverrideAuthority(this.authorityOverride)
1✔
641
            .build();
1✔
642
    this.nameResolver = getNameResolver(
1✔
643
        target, authorityOverride, nameResolverRegistry, nameResolverArgs,
644
        transportFactory.getSupportedSocketAddressTypes());
1✔
645
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
646
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
647
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
648
    this.delayedTransport.start(delayedTransportListener);
1✔
649
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
650

651
    if (builder.defaultServiceConfig != null) {
1✔
652
      ConfigOrError parsedDefaultServiceConfig =
1✔
653
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
654
      checkState(
1✔
655
          parsedDefaultServiceConfig.getError() == null,
1✔
656
          "Default config is invalid: %s",
657
          parsedDefaultServiceConfig.getError());
1✔
658
      this.defaultServiceConfig =
1✔
659
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
660
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
661
    } else {
1✔
662
      this.defaultServiceConfig = null;
1✔
663
    }
664
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
665
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
666
    Channel channel = realChannel;
1✔
667
    if (builder.binlog != null) {
1✔
668
      channel = builder.binlog.wrapChannel(channel);
1✔
669
    }
670
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
671
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
672
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
673
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
674
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
675
    } else {
676
      checkArgument(
1✔
677
          builder.idleTimeoutMillis
678
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
679
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
680
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
681
    }
682

683
    idleTimer = new Rescheduler(
1✔
684
        new IdleModeTimer(),
685
        syncContext,
686
        transportFactory.getScheduledExecutorService(),
1✔
687
        stopwatchSupplier.get());
1✔
688
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
689
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
690
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
691
    this.userAgent = builder.userAgent;
1✔
692

693
    this.channelBufferLimit = builder.retryBufferSize;
1✔
694
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
695
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
696
      @Override
697
      public CallTracer create() {
698
        return new CallTracer(timeProvider);
1✔
699
      }
700
    }
701

702
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
703
    channelCallTracer = callTracerFactory.create();
1✔
704
    this.channelz = checkNotNull(builder.channelz);
1✔
705
    channelz.addRootChannel(this);
1✔
706

707
    if (!lookUpServiceConfig) {
1✔
708
      if (defaultServiceConfig != null) {
1✔
709
        channelLogger.log(
1✔
710
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
711
      }
712
      serviceConfigUpdated = true;
1✔
713
    }
714
  }
1✔
715

716
  private static NameResolver getNameResolver(
717
      String target, NameResolverRegistry nameResolverRegistry, NameResolver.Args nameResolverArgs,
718
      Collection<Class<? extends SocketAddress>> channelTransportSocketAddressTypes) {
719
    // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
720
    // "dns:///".
721
    NameResolverProvider provider = null;
1✔
722
    URI targetUri = null;
1✔
723
    StringBuilder uriSyntaxErrors = new StringBuilder();
1✔
724
    try {
725
      targetUri = new URI(target);
1✔
726
    } catch (URISyntaxException e) {
1✔
727
      // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
728
      uriSyntaxErrors.append(e.getMessage());
1✔
729
    }
1✔
730
    if (targetUri != null) {
1✔
731
      // For "localhost:8080" this would likely cause provider to be null, because "localhost" is
732
      // parsed as the scheme. Will hit the next case and try "dns:///localhost:8080".
733
      provider = nameResolverRegistry.getProviderForScheme(targetUri.getScheme());
1✔
734
    }
735

736
    if (provider == null && !URI_PATTERN.matcher(target).matches()) {
1✔
737
      // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
738
      // scheme from the registry.
739
      try {
740
        targetUri = new URI(nameResolverRegistry.getDefaultScheme(), "", "/" + target, null);
1✔
741
      } catch (URISyntaxException e) {
×
742
        // Should not be possible.
743
        throw new IllegalArgumentException(e);
×
744
      }
1✔
745
      provider = nameResolverRegistry.getProviderForScheme(targetUri.getScheme());
1✔
746
    }
747

748
    if (provider == null) {
1✔
749
      throw new IllegalArgumentException(String.format(
1✔
750
          "Could not find a NameResolverProvider for %s%s",
751
          target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
1✔
752
    }
753

754
    if (channelTransportSocketAddressTypes != null) {
1✔
755
      Collection<Class<? extends SocketAddress>> nameResolverSocketAddressTypes
1✔
756
          = provider.getProducedSocketAddressTypes();
1✔
757
      if (!channelTransportSocketAddressTypes.containsAll(nameResolverSocketAddressTypes)) {
1✔
758
        throw new IllegalArgumentException(String.format(
1✔
759
            "Address types of NameResolver '%s' for '%s' not supported by transport",
760
            targetUri.getScheme(), target));
1✔
761
      }
762
    }
763

764
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
765
    if (resolver != null) {
1✔
766
      return resolver;
1✔
767
    }
768

769
    throw new IllegalArgumentException(String.format(
1✔
770
        "cannot create a NameResolver for %s%s",
771
        target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
1✔
772
  }
773

774
  @VisibleForTesting
775
  static NameResolver getNameResolver(
776
      String target, @Nullable final String overrideAuthority,
777
      NameResolverRegistry nameResolverRegistry, NameResolver.Args nameResolverArgs,
778
      Collection<Class<? extends SocketAddress>> channelTransportSocketAddressTypes) {
779
    NameResolver resolver = getNameResolver(target, nameResolverRegistry, nameResolverArgs,
1✔
780
        channelTransportSocketAddressTypes);
781

782
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
783
    // TODO: After a transition period, all NameResolver implementations that need retry should use
784
    //       RetryingNameResolver directly and this step can be removed.
785
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
786
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
787
              nameResolverArgs.getScheduledExecutorService(),
1✔
788
              nameResolverArgs.getSynchronizationContext()),
1✔
789
          nameResolverArgs.getSynchronizationContext());
1✔
790

791
    if (overrideAuthority == null) {
1✔
792
      return usedNameResolver;
1✔
793
    }
794

795
    return new ForwardingNameResolver(usedNameResolver) {
1✔
796
      @Override
797
      public String getServiceAuthority() {
798
        return overrideAuthority;
1✔
799
      }
800
    };
801
  }
802

803
  @VisibleForTesting
804
  InternalConfigSelector getConfigSelector() {
805
    return realChannel.configSelector.get();
1✔
806
  }
807

808
  /**
809
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
810
   * cancelled.
811
   */
812
  @Override
813
  public ManagedChannelImpl shutdown() {
814
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
815
    if (!shutdown.compareAndSet(false, true)) {
1✔
816
      return this;
1✔
817
    }
818
    final class Shutdown implements Runnable {
1✔
819
      @Override
820
      public void run() {
821
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
822
        channelStateManager.gotoState(SHUTDOWN);
1✔
823
      }
1✔
824
    }
825

826
    syncContext.execute(new Shutdown());
1✔
827
    realChannel.shutdown();
1✔
828
    final class CancelIdleTimer implements Runnable {
1✔
829
      @Override
830
      public void run() {
831
        cancelIdleTimer(/* permanent= */ true);
1✔
832
      }
1✔
833
    }
834

835
    syncContext.execute(new CancelIdleTimer());
1✔
836
    return this;
1✔
837
  }
838

839
  /**
840
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
841
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
842
   * return {@code false} immediately after this method returns.
843
   */
844
  @Override
845
  public ManagedChannelImpl shutdownNow() {
846
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
847
    shutdown();
1✔
848
    realChannel.shutdownNow();
1✔
849
    final class ShutdownNow implements Runnable {
1✔
850
      @Override
851
      public void run() {
852
        if (shutdownNowed) {
1✔
853
          return;
1✔
854
        }
855
        shutdownNowed = true;
1✔
856
        maybeShutdownNowSubchannels();
1✔
857
      }
1✔
858
    }
859

860
    syncContext.execute(new ShutdownNow());
1✔
861
    return this;
1✔
862
  }
863

864
  // Called from syncContext
865
  @VisibleForTesting
866
  void panic(final Throwable t) {
867
    if (panicMode) {
1✔
868
      // Preserve the first panic information
869
      return;
×
870
    }
871
    panicMode = true;
1✔
872
    cancelIdleTimer(/* permanent= */ true);
1✔
873
    shutdownNameResolverAndLoadBalancer(false);
1✔
874
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
875
      private final PickResult panicPickResult =
1✔
876
          PickResult.withDrop(
1✔
877
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
878

879
      @Override
880
      public PickResult pickSubchannel(PickSubchannelArgs args) {
881
        return panicPickResult;
1✔
882
      }
883

884
      @Override
885
      public String toString() {
886
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
887
            .add("panicPickResult", panicPickResult)
×
888
            .toString();
×
889
      }
890
    }
891

892
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
893
    realChannel.updateConfigSelector(null);
1✔
894
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
895
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
896
  }
1✔
897

898
  @VisibleForTesting
899
  boolean isInPanicMode() {
900
    return panicMode;
1✔
901
  }
902

903
  // Called from syncContext
904
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
905
    subchannelPicker = newPicker;
1✔
906
    delayedTransport.reprocess(newPicker);
1✔
907
  }
1✔
908

909
  @Override
910
  public boolean isShutdown() {
911
    return shutdown.get();
1✔
912
  }
913

914
  @Override
915
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
916
    return terminatedLatch.await(timeout, unit);
1✔
917
  }
918

919
  @Override
920
  public boolean isTerminated() {
921
    return terminated;
1✔
922
  }
923

924
  /*
925
   * Creates a new outgoing call on the channel.
926
   */
927
  @Override
928
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
929
      CallOptions callOptions) {
930
    return interceptorChannel.newCall(method, callOptions);
1✔
931
  }
932

933
  @Override
934
  public String authority() {
935
    return interceptorChannel.authority();
1✔
936
  }
937

938
  private Executor getCallExecutor(CallOptions callOptions) {
939
    Executor executor = callOptions.getExecutor();
1✔
940
    if (executor == null) {
1✔
941
      executor = this.executor;
1✔
942
    }
943
    return executor;
1✔
944
  }
945

946
  private class RealChannel extends Channel {
947
    // Reference to null if no config selector is available from resolution result
948
    // Reference must be set() from syncContext
949
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
950
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
951
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
952
    // same target, the new instance must have the same value.
953
    private final String authority;
954

955
    private final Channel clientCallImplChannel = new Channel() {
1✔
956
      @Override
957
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
958
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
959
        return new ClientCallImpl<>(
1✔
960
            method,
961
            getCallExecutor(callOptions),
1✔
962
            callOptions,
963
            transportProvider,
1✔
964
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
965
            channelCallTracer,
1✔
966
            null)
967
            .setFullStreamDecompression(fullStreamDecompression)
1✔
968
            .setDecompressorRegistry(decompressorRegistry)
1✔
969
            .setCompressorRegistry(compressorRegistry);
1✔
970
      }
971

972
      @Override
973
      public String authority() {
974
        return authority;
×
975
      }
976
    };
977

978
    private RealChannel(String authority) {
1✔
979
      this.authority =  checkNotNull(authority, "authority");
1✔
980
    }
1✔
981

982
    @Override
983
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
984
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
985
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
986
        return newClientCall(method, callOptions);
1✔
987
      }
988
      syncContext.execute(new Runnable() {
1✔
989
        @Override
990
        public void run() {
991
          exitIdleMode();
1✔
992
        }
1✔
993
      });
994
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
995
        // This is an optimization for the case (typically with InProcessTransport) when name
996
        // resolution result is immediately available at this point. Otherwise, some users'
997
        // tests might observe slight behavior difference from earlier grpc versions.
998
        return newClientCall(method, callOptions);
1✔
999
      }
1000
      if (shutdown.get()) {
1✔
1001
        // Return a failing ClientCall.
1002
        return new ClientCall<ReqT, RespT>() {
×
1003
          @Override
1004
          public void start(Listener<RespT> responseListener, Metadata headers) {
1005
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
1006
          }
×
1007

1008
          @Override public void request(int numMessages) {}
×
1009

1010
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
1011

1012
          @Override public void halfClose() {}
×
1013

1014
          @Override public void sendMessage(ReqT message) {}
×
1015
        };
1016
      }
1017
      Context context = Context.current();
1✔
1018
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
1019
      syncContext.execute(new Runnable() {
1✔
1020
        @Override
1021
        public void run() {
1022
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1023
            if (pendingCalls == null) {
1✔
1024
              pendingCalls = new LinkedHashSet<>();
1✔
1025
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
1026
            }
1027
            pendingCalls.add(pendingCall);
1✔
1028
          } else {
1029
            pendingCall.reprocess();
1✔
1030
          }
1031
        }
1✔
1032
      });
1033
      return pendingCall;
1✔
1034
    }
1035

1036
    // Must run in SynchronizationContext.
1037
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
1038
      InternalConfigSelector prevConfig = configSelector.get();
1✔
1039
      configSelector.set(config);
1✔
1040
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
1041
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1042
          pendingCall.reprocess();
1✔
1043
        }
1✔
1044
      }
1045
    }
1✔
1046

1047
    // Must run in SynchronizationContext.
1048
    void onConfigError() {
1049
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1050
        updateConfigSelector(null);
1✔
1051
      }
1052
    }
1✔
1053

1054
    void shutdown() {
1055
      final class RealChannelShutdown implements Runnable {
1✔
1056
        @Override
1057
        public void run() {
1058
          if (pendingCalls == null) {
1✔
1059
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1060
              configSelector.set(null);
1✔
1061
            }
1062
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1063
          }
1064
        }
1✔
1065
      }
1066

1067
      syncContext.execute(new RealChannelShutdown());
1✔
1068
    }
1✔
1069

1070
    void shutdownNow() {
1071
      final class RealChannelShutdownNow implements Runnable {
1✔
1072
        @Override
1073
        public void run() {
1074
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1075
            configSelector.set(null);
1✔
1076
          }
1077
          if (pendingCalls != null) {
1✔
1078
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1079
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
1080
            }
1✔
1081
          }
1082
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
1083
        }
1✔
1084
      }
1085

1086
      syncContext.execute(new RealChannelShutdownNow());
1✔
1087
    }
1✔
1088

1089
    @Override
1090
    public String authority() {
1091
      return authority;
1✔
1092
    }
1093

1094
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1095
      final Context context;
1096
      final MethodDescriptor<ReqT, RespT> method;
1097
      final CallOptions callOptions;
1098
      private final long callCreationTime;
1099

1100
      PendingCall(
1101
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1102
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1103
        this.context = context;
1✔
1104
        this.method = method;
1✔
1105
        this.callOptions = callOptions;
1✔
1106
        this.callCreationTime = ticker.nanoTime();
1✔
1107
      }
1✔
1108

1109
      /** Called when it's ready to create a real call and reprocess the pending call. */
1110
      void reprocess() {
1111
        ClientCall<ReqT, RespT> realCall;
1112
        Context previous = context.attach();
1✔
1113
        try {
1114
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1115
              ticker.nanoTime() - callCreationTime);
1✔
1116
          realCall = newClientCall(method, delayResolutionOption);
1✔
1117
        } finally {
1118
          context.detach(previous);
1✔
1119
        }
1120
        Runnable toRun = setCall(realCall);
1✔
1121
        if (toRun == null) {
1✔
1122
          syncContext.execute(new PendingCallRemoval());
1✔
1123
        } else {
1124
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1125
            @Override
1126
            public void run() {
1127
              toRun.run();
1✔
1128
              syncContext.execute(new PendingCallRemoval());
1✔
1129
            }
1✔
1130
          });
1131
        }
1132
      }
1✔
1133

1134
      @Override
1135
      protected void callCancelled() {
1136
        super.callCancelled();
1✔
1137
        syncContext.execute(new PendingCallRemoval());
1✔
1138
      }
1✔
1139

1140
      final class PendingCallRemoval implements Runnable {
1✔
1141
        @Override
1142
        public void run() {
1143
          if (pendingCalls != null) {
1✔
1144
            pendingCalls.remove(PendingCall.this);
1✔
1145
            if (pendingCalls.isEmpty()) {
1✔
1146
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1147
              pendingCalls = null;
1✔
1148
              if (shutdown.get()) {
1✔
1149
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1150
              }
1151
            }
1152
          }
1153
        }
1✔
1154
      }
1155
    }
1156

1157
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1158
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1159
      InternalConfigSelector selector = configSelector.get();
1✔
1160
      if (selector == null) {
1✔
1161
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1162
      }
1163
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1164
        MethodInfo methodInfo =
1✔
1165
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1166
        if (methodInfo != null) {
1✔
1167
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1168
        }
1169
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1170
      }
1171
      return new ConfigSelectingClientCall<>(
1✔
1172
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1173
    }
1174
  }
1175

1176
  /**
1177
   * A client call for a given channel that applies a given config selector when it starts.
1178
   */
1179
  static final class ConfigSelectingClientCall<ReqT, RespT>
1180
      extends ForwardingClientCall<ReqT, RespT> {
1181

1182
    private final InternalConfigSelector configSelector;
1183
    private final Channel channel;
1184
    private final Executor callExecutor;
1185
    private final MethodDescriptor<ReqT, RespT> method;
1186
    private final Context context;
1187
    private CallOptions callOptions;
1188

1189
    private ClientCall<ReqT, RespT> delegate;
1190

1191
    ConfigSelectingClientCall(
1192
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1193
        MethodDescriptor<ReqT, RespT> method,
1194
        CallOptions callOptions) {
1✔
1195
      this.configSelector = configSelector;
1✔
1196
      this.channel = channel;
1✔
1197
      this.method = method;
1✔
1198
      this.callExecutor =
1✔
1199
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1200
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1201
      this.context = Context.current();
1✔
1202
    }
1✔
1203

1204
    @Override
1205
    protected ClientCall<ReqT, RespT> delegate() {
1206
      return delegate;
1✔
1207
    }
1208

1209
    @SuppressWarnings("unchecked")
1210
    @Override
1211
    public void start(Listener<RespT> observer, Metadata headers) {
1212
      PickSubchannelArgs args =
1✔
1213
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1214
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1215
      Status status = result.getStatus();
1✔
1216
      if (!status.isOk()) {
1✔
1217
        executeCloseObserverInContext(observer,
1✔
1218
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1219
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1220
        return;
1✔
1221
      }
1222
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1223
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1224
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1225
      if (methodInfo != null) {
1✔
1226
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1227
      }
1228
      if (interceptor != null) {
1✔
1229
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1230
      } else {
1231
        delegate = channel.newCall(method, callOptions);
×
1232
      }
1233
      delegate.start(observer, headers);
1✔
1234
    }
1✔
1235

1236
    private void executeCloseObserverInContext(
1237
        final Listener<RespT> observer, final Status status) {
1238
      class CloseInContext extends ContextRunnable {
1239
        CloseInContext() {
1✔
1240
          super(context);
1✔
1241
        }
1✔
1242

1243
        @Override
1244
        public void runInContext() {
1245
          observer.onClose(status, new Metadata());
1✔
1246
        }
1✔
1247
      }
1248

1249
      callExecutor.execute(new CloseInContext());
1✔
1250
    }
1✔
1251

1252
    @Override
1253
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1254
      if (delegate != null) {
×
1255
        delegate.cancel(message, cause);
×
1256
      }
1257
    }
×
1258
  }
1259

1260
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1261
    @Override
1262
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1263

1264
    @Override
1265
    public void request(int numMessages) {}
1✔
1266

1267
    @Override
1268
    public void cancel(String message, Throwable cause) {}
×
1269

1270
    @Override
1271
    public void halfClose() {}
×
1272

1273
    @Override
1274
    public void sendMessage(Object message) {}
×
1275

1276
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1277
    @Override
1278
    public boolean isReady() {
1279
      return false;
×
1280
    }
1281
  };
1282

1283
  /**
1284
   * Terminate the channel if termination conditions are met.
1285
   */
1286
  // Must be run from syncContext
1287
  private void maybeTerminateChannel() {
1288
    if (terminated) {
1✔
1289
      return;
×
1290
    }
1291
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1292
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1293
      channelz.removeRootChannel(this);
1✔
1294
      executorPool.returnObject(executor);
1✔
1295
      balancerRpcExecutorHolder.release();
1✔
1296
      offloadExecutorHolder.release();
1✔
1297
      // Release the transport factory so that it can deallocate any resources.
1298
      transportFactory.close();
1✔
1299

1300
      terminated = true;
1✔
1301
      terminatedLatch.countDown();
1✔
1302
    }
1303
  }
1✔
1304

1305
  // Must be called from syncContext
1306
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1307
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1308
      refreshNameResolution();
1✔
1309
    }
1310
  }
1✔
1311

1312
  @Override
1313
  @SuppressWarnings("deprecation")
1314
  public ConnectivityState getState(boolean requestConnection) {
1315
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1316
    if (requestConnection && savedChannelState == IDLE) {
1✔
1317
      final class RequestConnection implements Runnable {
1✔
1318
        @Override
1319
        public void run() {
1320
          exitIdleMode();
1✔
1321
          if (subchannelPicker != null) {
1✔
1322
            subchannelPicker.requestConnection();
1✔
1323
          }
1324
          if (lbHelper != null) {
1✔
1325
            lbHelper.lb.requestConnection();
1✔
1326
          }
1327
        }
1✔
1328
      }
1329

1330
      syncContext.execute(new RequestConnection());
1✔
1331
    }
1332
    return savedChannelState;
1✔
1333
  }
1334

1335
  @Override
1336
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1337
    final class NotifyStateChanged implements Runnable {
1✔
1338
      @Override
1339
      public void run() {
1340
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1341
      }
1✔
1342
    }
1343

1344
    syncContext.execute(new NotifyStateChanged());
1✔
1345
  }
1✔
1346

1347
  @Override
1348
  public void resetConnectBackoff() {
1349
    final class ResetConnectBackoff implements Runnable {
1✔
1350
      @Override
1351
      public void run() {
1352
        if (shutdown.get()) {
1✔
1353
          return;
1✔
1354
        }
1355
        if (nameResolverStarted) {
1✔
1356
          refreshNameResolution();
1✔
1357
        }
1358
        for (InternalSubchannel subchannel : subchannels) {
1✔
1359
          subchannel.resetConnectBackoff();
1✔
1360
        }
1✔
1361
        for (OobChannel oobChannel : oobChannels) {
1✔
1362
          oobChannel.resetConnectBackoff();
×
1363
        }
×
1364
      }
1✔
1365
    }
1366

1367
    syncContext.execute(new ResetConnectBackoff());
1✔
1368
  }
1✔
1369

1370
  @Override
1371
  public void enterIdle() {
1372
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1373
      @Override
1374
      public void run() {
1375
        if (shutdown.get() || lbHelper == null) {
1✔
1376
          return;
1✔
1377
        }
1378
        cancelIdleTimer(/* permanent= */ false);
1✔
1379
        enterIdleMode();
1✔
1380
      }
1✔
1381
    }
1382

1383
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1384
  }
1✔
1385

1386
  /**
1387
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1388
   * backoff.
1389
   */
1390
  private final class UncommittedRetriableStreamsRegistry {
1✔
1391
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1392
    // it's worthwhile to look for a lock-free approach.
1393
    final Object lock = new Object();
1✔
1394

1395
    @GuardedBy("lock")
1✔
1396
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1397

1398
    @GuardedBy("lock")
1399
    Status shutdownStatus;
1400

1401
    void onShutdown(Status reason) {
1402
      boolean shouldShutdownDelayedTransport = false;
1✔
1403
      synchronized (lock) {
1✔
1404
        if (shutdownStatus != null) {
1✔
1405
          return;
1✔
1406
        }
1407
        shutdownStatus = reason;
1✔
1408
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1409
        // retriable streams, which may be in backoff and not using any transport, are already
1410
        // started RPCs.
1411
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1412
          shouldShutdownDelayedTransport = true;
1✔
1413
        }
1414
      }
1✔
1415

1416
      if (shouldShutdownDelayedTransport) {
1✔
1417
        delayedTransport.shutdown(reason);
1✔
1418
      }
1419
    }
1✔
1420

1421
    void onShutdownNow(Status reason) {
1422
      onShutdown(reason);
1✔
1423
      Collection<ClientStream> streams;
1424

1425
      synchronized (lock) {
1✔
1426
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1427
      }
1✔
1428

1429
      for (ClientStream stream : streams) {
1✔
1430
        stream.cancel(reason);
1✔
1431
      }
1✔
1432
      delayedTransport.shutdownNow(reason);
1✔
1433
    }
1✔
1434

1435
    /**
1436
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1437
     * shutdown Status.
1438
     */
1439
    @Nullable
1440
    Status add(RetriableStream<?> retriableStream) {
1441
      synchronized (lock) {
1✔
1442
        if (shutdownStatus != null) {
1✔
1443
          return shutdownStatus;
1✔
1444
        }
1445
        uncommittedRetriableStreams.add(retriableStream);
1✔
1446
        return null;
1✔
1447
      }
1448
    }
1449

1450
    void remove(RetriableStream<?> retriableStream) {
1451
      Status shutdownStatusCopy = null;
1✔
1452

1453
      synchronized (lock) {
1✔
1454
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1455
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1456
          shutdownStatusCopy = shutdownStatus;
1✔
1457
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1458
          // hashmap.
1459
          uncommittedRetriableStreams = new HashSet<>();
1✔
1460
        }
1461
      }
1✔
1462

1463
      if (shutdownStatusCopy != null) {
1✔
1464
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1465
      }
1466
    }
1✔
1467
  }
1468

1469
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1470
    AutoConfiguredLoadBalancer lb;
1471

1472
    @Override
1473
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1474
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1475
      // No new subchannel should be created after load balancer has been shutdown.
1476
      checkState(!terminating, "Channel is being terminated");
1✔
1477
      return new SubchannelImpl(args);
1✔
1478
    }
1479

1480
    @Override
1481
    public void updateBalancingState(
1482
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1483
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1484
      checkNotNull(newState, "newState");
1✔
1485
      checkNotNull(newPicker, "newPicker");
1✔
1486
      final class UpdateBalancingState implements Runnable {
1✔
1487
        @Override
1488
        public void run() {
1489
          if (LbHelperImpl.this != lbHelper) {
1✔
1490
            return;
1✔
1491
          }
1492
          updateSubchannelPicker(newPicker);
1✔
1493
          // It's not appropriate to report SHUTDOWN state from lb.
1494
          // Ignore the case of newState == SHUTDOWN for now.
1495
          if (newState != SHUTDOWN) {
1✔
1496
            channelLogger.log(
1✔
1497
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1498
            channelStateManager.gotoState(newState);
1✔
1499
          }
1500
        }
1✔
1501
      }
1502

1503
      syncContext.execute(new UpdateBalancingState());
1✔
1504
    }
1✔
1505

1506
    @Override
1507
    public void refreshNameResolution() {
1508
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1509
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1510
        @Override
1511
        public void run() {
1512
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1513
        }
1✔
1514
      }
1515

1516
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1517
    }
1✔
1518

1519
    @Override
1520
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1521
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1522
    }
1523

1524
    @Override
1525
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1526
        String authority) {
1527
      // TODO(ejona): can we be even stricter? Like terminating?
1528
      checkState(!terminated, "Channel is terminated");
1✔
1529
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1530
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1531
      InternalLogId subchannelLogId =
1✔
1532
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1533
      ChannelTracer oobChannelTracer =
1✔
1534
          new ChannelTracer(
1535
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1536
              "OobChannel for " + addressGroup);
1537
      final OobChannel oobChannel = new OobChannel(
1✔
1538
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1539
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1540
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1541
          .setDescription("Child OobChannel created")
1✔
1542
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1543
          .setTimestampNanos(oobChannelCreationTime)
1✔
1544
          .setChannelRef(oobChannel)
1✔
1545
          .build());
1✔
1546
      ChannelTracer subchannelTracer =
1✔
1547
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1548
              "Subchannel for " + addressGroup);
1549
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1550
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1551
        @Override
1552
        void onTerminated(InternalSubchannel is) {
1553
          oobChannels.remove(oobChannel);
1✔
1554
          channelz.removeSubchannel(is);
1✔
1555
          oobChannel.handleSubchannelTerminated();
1✔
1556
          maybeTerminateChannel();
1✔
1557
        }
1✔
1558

1559
        @Override
1560
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1561
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1562
          //  state and refresh name resolution if necessary.
1563
          handleInternalSubchannelState(newState);
1✔
1564
          oobChannel.handleSubchannelStateChange(newState);
1✔
1565
        }
1✔
1566
      }
1567

1568
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1569
          addressGroup,
1570
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1571
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1572
          // All callback methods are run from syncContext
1573
          new ManagedOobChannelCallback(),
1574
          channelz,
1✔
1575
          callTracerFactory.create(),
1✔
1576
          subchannelTracer,
1577
          subchannelLogId,
1578
          subchannelLogger,
1579
          transportFilters);
1✔
1580
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1581
          .setDescription("Child Subchannel created")
1✔
1582
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1583
          .setTimestampNanos(oobChannelCreationTime)
1✔
1584
          .setSubchannelRef(internalSubchannel)
1✔
1585
          .build());
1✔
1586
      channelz.addSubchannel(oobChannel);
1✔
1587
      channelz.addSubchannel(internalSubchannel);
1✔
1588
      oobChannel.setSubchannel(internalSubchannel);
1✔
1589
      final class AddOobChannel implements Runnable {
1✔
1590
        @Override
1591
        public void run() {
1592
          if (terminating) {
1✔
1593
            oobChannel.shutdown();
×
1594
          }
1595
          if (!terminated) {
1✔
1596
            // If channel has not terminated, it will track the subchannel and block termination
1597
            // for it.
1598
            oobChannels.add(oobChannel);
1✔
1599
          }
1600
        }
1✔
1601
      }
1602

1603
      syncContext.execute(new AddOobChannel());
1✔
1604
      return oobChannel;
1✔
1605
    }
1606

1607
    @Deprecated
1608
    @Override
1609
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1610
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1611
          // Override authority to keep the old behavior.
1612
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1613
          .overrideAuthority(getAuthority());
1✔
1614
    }
1615

1616
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1617
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1618
    @Override
1619
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1620
        final String target, final ChannelCredentials channelCreds) {
1621
      checkNotNull(channelCreds, "channelCreds");
1✔
1622

1623
      final class ResolvingOobChannelBuilder
1624
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1625
        final ManagedChannelBuilder<?> delegate;
1626

1627
        ResolvingOobChannelBuilder() {
1✔
1628
          final ClientTransportFactory transportFactory;
1629
          CallCredentials callCredentials;
1630
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1631
            transportFactory = originalTransportFactory;
1✔
1632
            callCredentials = null;
1✔
1633
          } else {
1634
            SwapChannelCredentialsResult swapResult =
1✔
1635
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1636
            if (swapResult == null) {
1✔
1637
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1638
              return;
×
1639
            } else {
1640
              transportFactory = swapResult.transportFactory;
1✔
1641
              callCredentials = swapResult.callCredentials;
1✔
1642
            }
1643
          }
1644
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1645
              new ClientTransportFactoryBuilder() {
1✔
1646
                @Override
1647
                public ClientTransportFactory buildClientTransportFactory() {
1648
                  return transportFactory;
1✔
1649
                }
1650
              };
1651
          delegate = new ManagedChannelImplBuilder(
1✔
1652
              target,
1653
              channelCreds,
1654
              callCredentials,
1655
              transportFactoryBuilder,
1656
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1657
              .nameResolverRegistry(nameResolverRegistry);
1✔
1658
        }
1✔
1659

1660
        @Override
1661
        protected ManagedChannelBuilder<?> delegate() {
1662
          return delegate;
1✔
1663
        }
1664
      }
1665

1666
      checkState(!terminated, "Channel is terminated");
1✔
1667

1668
      @SuppressWarnings("deprecation")
1669
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1670

1671
      return builder
1✔
1672
          // TODO(zdapeng): executors should not outlive the parent channel.
1673
          .executor(executor)
1✔
1674
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1675
          .maxTraceEvents(maxTraceEvents)
1✔
1676
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1677
          .userAgent(userAgent);
1✔
1678
    }
1679

1680
    @Override
1681
    public ChannelCredentials getUnsafeChannelCredentials() {
1682
      if (originalChannelCreds == null) {
×
1683
        return new DefaultChannelCreds();
×
1684
      }
1685
      return originalChannelCreds;
×
1686
    }
1687

1688
    @Override
1689
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1690
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1691
    }
×
1692

1693
    @Override
1694
    public void updateOobChannelAddresses(ManagedChannel channel,
1695
        List<EquivalentAddressGroup> eag) {
1696
      checkArgument(channel instanceof OobChannel,
1✔
1697
          "channel must have been returned from createOobChannel");
1698
      ((OobChannel) channel).updateAddresses(eag);
1✔
1699
    }
1✔
1700

1701
    @Override
1702
    public String getAuthority() {
1703
      return ManagedChannelImpl.this.authority();
1✔
1704
    }
1705

1706
    @Override
1707
    public SynchronizationContext getSynchronizationContext() {
1708
      return syncContext;
1✔
1709
    }
1710

1711
    @Override
1712
    public ScheduledExecutorService getScheduledExecutorService() {
1713
      return scheduledExecutor;
1✔
1714
    }
1715

1716
    @Override
1717
    public ChannelLogger getChannelLogger() {
1718
      return channelLogger;
1✔
1719
    }
1720

1721
    @Override
1722
    public NameResolver.Args getNameResolverArgs() {
1723
      return nameResolverArgs;
1✔
1724
    }
1725

1726
    @Override
1727
    public NameResolverRegistry getNameResolverRegistry() {
1728
      return nameResolverRegistry;
1✔
1729
    }
1730

1731
    /**
1732
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1733
     */
1734
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1735
    //     channel creds.
1736
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1737
      @Override
1738
      public ChannelCredentials withoutBearerTokens() {
1739
        return this;
×
1740
      }
1741
    }
1742
  }
1743

1744
  final class NameResolverListener extends NameResolver.Listener2 {
1745
    final LbHelperImpl helper;
1746
    final NameResolver resolver;
1747

1748
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1749
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1750
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1751
    }
1✔
1752

1753
    @Override
1754
    public void onResult(final ResolutionResult resolutionResult) {
1755
      final class NamesResolved implements Runnable {
1✔
1756

1757
        @SuppressWarnings("ReferenceEquality")
1758
        @Override
1759
        public void run() {
1760
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1761
            return;
1✔
1762
          }
1763

1764
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1765
          channelLogger.log(
1✔
1766
              ChannelLogLevel.DEBUG,
1767
              "Resolved address: {0}, config={1}",
1768
              servers,
1769
              resolutionResult.getAttributes());
1✔
1770

1771
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1772
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1773
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1774
          }
1775

1776
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1777
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1778
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1779
          InternalConfigSelector resolvedConfigSelector =
1✔
1780
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1781
          ManagedChannelServiceConfig validServiceConfig =
1782
              configOrError != null && configOrError.getConfig() != null
1✔
1783
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1784
                  : null;
1✔
1785
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1786

1787
          ManagedChannelServiceConfig effectiveServiceConfig;
1788
          if (!lookUpServiceConfig) {
1✔
1789
            if (validServiceConfig != null) {
1✔
1790
              channelLogger.log(
1✔
1791
                  ChannelLogLevel.INFO,
1792
                  "Service config from name resolver discarded by channel settings");
1793
            }
1794
            effectiveServiceConfig =
1795
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1796
            if (resolvedConfigSelector != null) {
1✔
1797
              channelLogger.log(
1✔
1798
                  ChannelLogLevel.INFO,
1799
                  "Config selector from name resolver discarded by channel settings");
1800
            }
1801
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1802
          } else {
1803
            // Try to use config if returned from name resolver
1804
            // Otherwise, try to use the default config if available
1805
            if (validServiceConfig != null) {
1✔
1806
              effectiveServiceConfig = validServiceConfig;
1✔
1807
              if (resolvedConfigSelector != null) {
1✔
1808
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1809
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1810
                  channelLogger.log(
×
1811
                      ChannelLogLevel.DEBUG,
1812
                      "Method configs in service config will be discarded due to presence of"
1813
                          + "config-selector");
1814
                }
1815
              } else {
1816
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1817
              }
1818
            } else if (defaultServiceConfig != null) {
1✔
1819
              effectiveServiceConfig = defaultServiceConfig;
1✔
1820
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1821
              channelLogger.log(
1✔
1822
                  ChannelLogLevel.INFO,
1823
                  "Received no service config, using default service config");
1824
            } else if (serviceConfigError != null) {
1✔
1825
              if (!serviceConfigUpdated) {
1✔
1826
                // First DNS lookup has invalid service config, and cannot fall back to default
1827
                channelLogger.log(
1✔
1828
                    ChannelLogLevel.INFO,
1829
                    "Fallback to error due to invalid first service config without default config");
1830
                // This error could be an "inappropriate" control plane error that should not bleed
1831
                // through to client code using gRPC. We let them flow through here to the LB as
1832
                // we later check for these error codes when investigating pick results in
1833
                // GrpcUtil.getTransportFromPickResult().
1834
                onError(configOrError.getError());
1✔
1835
                if (resolutionResultListener != null) {
1✔
1836
                  resolutionResultListener.resolutionAttempted(configOrError.getError());
1✔
1837
                }
1838
                return;
1✔
1839
              } else {
1840
                effectiveServiceConfig = lastServiceConfig;
1✔
1841
              }
1842
            } else {
1843
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1844
              realChannel.updateConfigSelector(null);
1✔
1845
            }
1846
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1847
              channelLogger.log(
1✔
1848
                  ChannelLogLevel.INFO,
1849
                  "Service config changed{0}",
1850
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1851
              lastServiceConfig = effectiveServiceConfig;
1✔
1852
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1853
            }
1854

1855
            try {
1856
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1857
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1858
              //  lbNeedAddress is not deterministic
1859
              serviceConfigUpdated = true;
1✔
1860
            } catch (RuntimeException re) {
×
1861
              logger.log(
×
1862
                  Level.WARNING,
1863
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1864
                  re);
1865
            }
1✔
1866
          }
1867

1868
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1869
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1870
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1871
            Attributes.Builder attrBuilder =
1✔
1872
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1873
            Map<String, ?> healthCheckingConfig =
1✔
1874
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1875
            if (healthCheckingConfig != null) {
1✔
1876
              attrBuilder
1✔
1877
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1878
                  .build();
1✔
1879
            }
1880
            Attributes attributes = attrBuilder.build();
1✔
1881

1882
            Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1883
                ResolvedAddresses.newBuilder()
1✔
1884
                    .setAddresses(servers)
1✔
1885
                    .setAttributes(attributes)
1✔
1886
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1887
                    .build());
1✔
1888
            // If a listener is provided, let it know if the addresses were accepted.
1889
            if (resolutionResultListener != null) {
1✔
1890
              resolutionResultListener.resolutionAttempted(addressAcceptanceStatus);
1✔
1891
            }
1892
          }
1893
        }
1✔
1894
      }
1895

1896
      syncContext.execute(new NamesResolved());
1✔
1897
    }
1✔
1898

1899
    @Override
1900
    public void onError(final Status error) {
1901
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1902
      final class NameResolverErrorHandler implements Runnable {
1✔
1903
        @Override
1904
        public void run() {
1905
          handleErrorInSyncContext(error);
1✔
1906
        }
1✔
1907
      }
1908

1909
      syncContext.execute(new NameResolverErrorHandler());
1✔
1910
    }
1✔
1911

1912
    private void handleErrorInSyncContext(Status error) {
1913
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1914
          new Object[] {getLogId(), error});
1✔
1915
      realChannel.onConfigError();
1✔
1916
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1917
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1918
        lastResolutionState = ResolutionState.ERROR;
1✔
1919
      }
1920
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1921
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1922
        return;
1✔
1923
      }
1924

1925
      helper.lb.handleNameResolutionError(error);
1✔
1926
    }
1✔
1927
  }
1928

1929
  private final class SubchannelImpl extends AbstractSubchannel {
1930
    final CreateSubchannelArgs args;
1931
    final InternalLogId subchannelLogId;
1932
    final ChannelLoggerImpl subchannelLogger;
1933
    final ChannelTracer subchannelTracer;
1934
    List<EquivalentAddressGroup> addressGroups;
1935
    InternalSubchannel subchannel;
1936
    boolean started;
1937
    boolean shutdown;
1938
    ScheduledHandle delayedShutdownTask;
1939

1940
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1941
      checkNotNull(args, "args");
1✔
1942
      addressGroups = args.getAddresses();
1✔
1943
      if (authorityOverride != null) {
1✔
1944
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1945
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1946
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1947
      }
1948
      this.args = args;
1✔
1949
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1950
      subchannelTracer = new ChannelTracer(
1✔
1951
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1952
          "Subchannel for " + args.getAddresses());
1✔
1953
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1954
    }
1✔
1955

1956
    @Override
1957
    public void start(final SubchannelStateListener listener) {
1958
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1959
      checkState(!started, "already started");
1✔
1960
      checkState(!shutdown, "already shutdown");
1✔
1961
      checkState(!terminating, "Channel is being terminated");
1✔
1962
      started = true;
1✔
1963
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1964
        // All callbacks are run in syncContext
1965
        @Override
1966
        void onTerminated(InternalSubchannel is) {
1967
          subchannels.remove(is);
1✔
1968
          channelz.removeSubchannel(is);
1✔
1969
          maybeTerminateChannel();
1✔
1970
        }
1✔
1971

1972
        @Override
1973
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1974
          checkState(listener != null, "listener is null");
1✔
1975
          listener.onSubchannelState(newState);
1✔
1976
        }
1✔
1977

1978
        @Override
1979
        void onInUse(InternalSubchannel is) {
1980
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1981
        }
1✔
1982

1983
        @Override
1984
        void onNotInUse(InternalSubchannel is) {
1985
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1986
        }
1✔
1987
      }
1988

1989
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1990
          args.getAddresses(),
1✔
1991
          authority(),
1✔
1992
          userAgent,
1✔
1993
          backoffPolicyProvider,
1✔
1994
          transportFactory,
1✔
1995
          transportFactory.getScheduledExecutorService(),
1✔
1996
          stopwatchSupplier,
1✔
1997
          syncContext,
1998
          new ManagedInternalSubchannelCallback(),
1999
          channelz,
1✔
2000
          callTracerFactory.create(),
1✔
2001
          subchannelTracer,
2002
          subchannelLogId,
2003
          subchannelLogger,
2004
          transportFilters);
1✔
2005

2006
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
2007
          .setDescription("Child Subchannel started")
1✔
2008
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
2009
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
2010
          .setSubchannelRef(internalSubchannel)
1✔
2011
          .build());
1✔
2012

2013
      this.subchannel = internalSubchannel;
1✔
2014
      channelz.addSubchannel(internalSubchannel);
1✔
2015
      subchannels.add(internalSubchannel);
1✔
2016
    }
1✔
2017

2018
    @Override
2019
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
2020
      checkState(started, "not started");
1✔
2021
      return subchannel;
1✔
2022
    }
2023

2024
    @Override
2025
    public void shutdown() {
2026
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2027
      if (subchannel == null) {
1✔
2028
        // start() was not successful
2029
        shutdown = true;
×
2030
        return;
×
2031
      }
2032
      if (shutdown) {
1✔
2033
        if (terminating && delayedShutdownTask != null) {
1✔
2034
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
2035
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
2036
          delayedShutdownTask.cancel();
×
2037
          delayedShutdownTask = null;
×
2038
          // Will fall through to the subchannel.shutdown() at the end.
2039
        } else {
2040
          return;
1✔
2041
        }
2042
      } else {
2043
        shutdown = true;
1✔
2044
      }
2045
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
2046
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
2047
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
2048
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
2049
      // shutdown of Subchannel for a few seconds here.
2050
      //
2051
      // TODO(zhangkun83): consider a better approach
2052
      // (https://github.com/grpc/grpc-java/issues/2562).
2053
      if (!terminating) {
1✔
2054
        final class ShutdownSubchannel implements Runnable {
1✔
2055
          @Override
2056
          public void run() {
2057
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
2058
          }
1✔
2059
        }
2060

2061
        delayedShutdownTask = syncContext.schedule(
1✔
2062
            new LogExceptionRunnable(new ShutdownSubchannel()),
2063
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2064
            transportFactory.getScheduledExecutorService());
1✔
2065
        return;
1✔
2066
      }
2067
      // When terminating == true, no more real streams will be created. It's safe and also
2068
      // desirable to shutdown timely.
2069
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
2070
    }
1✔
2071

2072
    @Override
2073
    public void requestConnection() {
2074
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2075
      checkState(started, "not started");
1✔
2076
      subchannel.obtainActiveTransport();
1✔
2077
    }
1✔
2078

2079
    @Override
2080
    public List<EquivalentAddressGroup> getAllAddresses() {
2081
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2082
      checkState(started, "not started");
1✔
2083
      return addressGroups;
1✔
2084
    }
2085

2086
    @Override
2087
    public Attributes getAttributes() {
2088
      return args.getAttributes();
1✔
2089
    }
2090

2091
    @Override
2092
    public String toString() {
2093
      return subchannelLogId.toString();
1✔
2094
    }
2095

2096
    @Override
2097
    public Channel asChannel() {
2098
      checkState(started, "not started");
1✔
2099
      return new SubchannelChannel(
1✔
2100
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2101
          transportFactory.getScheduledExecutorService(),
1✔
2102
          callTracerFactory.create(),
1✔
2103
          new AtomicReference<InternalConfigSelector>(null));
2104
    }
2105

2106
    @Override
2107
    public Object getInternalSubchannel() {
2108
      checkState(started, "Subchannel is not started");
1✔
2109
      return subchannel;
1✔
2110
    }
2111

2112
    @Override
2113
    public ChannelLogger getChannelLogger() {
2114
      return subchannelLogger;
1✔
2115
    }
2116

2117
    @Override
2118
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2119
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2120
      addressGroups = addrs;
1✔
2121
      if (authorityOverride != null) {
1✔
2122
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2123
      }
2124
      subchannel.updateAddresses(addrs);
1✔
2125
    }
1✔
2126

2127
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2128
        List<EquivalentAddressGroup> eags) {
2129
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2130
      for (EquivalentAddressGroup eag : eags) {
1✔
2131
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2132
            eag.getAddresses(),
1✔
2133
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2134
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2135
      }
1✔
2136
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2137
    }
2138
  }
2139

2140
  @Override
2141
  public String toString() {
2142
    return MoreObjects.toStringHelper(this)
1✔
2143
        .add("logId", logId.getId())
1✔
2144
        .add("target", target)
1✔
2145
        .toString();
1✔
2146
  }
2147

2148
  /**
2149
   * Called from syncContext.
2150
   */
2151
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2152
    @Override
2153
    public void transportShutdown(Status s) {
2154
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2155
    }
1✔
2156

2157
    @Override
2158
    public void transportReady() {
2159
      // Don't care
2160
    }
×
2161

2162
    @Override
2163
    public Attributes filterTransport(Attributes attributes) {
2164
      return attributes;
×
2165
    }
2166

2167
    @Override
2168
    public void transportInUse(final boolean inUse) {
2169
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2170
    }
1✔
2171

2172
    @Override
2173
    public void transportTerminated() {
2174
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2175
      terminating = true;
1✔
2176
      shutdownNameResolverAndLoadBalancer(false);
1✔
2177
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2178
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2179
      // here.
2180
      maybeShutdownNowSubchannels();
1✔
2181
      maybeTerminateChannel();
1✔
2182
    }
1✔
2183
  }
2184

2185
  /**
2186
   * Must be accessed from syncContext.
2187
   */
2188
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2189
    @Override
2190
    protected void handleInUse() {
2191
      exitIdleMode();
1✔
2192
    }
1✔
2193

2194
    @Override
2195
    protected void handleNotInUse() {
2196
      if (shutdown.get()) {
1✔
2197
        return;
1✔
2198
      }
2199
      rescheduleIdleTimer();
1✔
2200
    }
1✔
2201
  }
2202

2203
  /**
2204
   * Lazily request for Executor from an executor pool.
2205
   * Also act as an Executor directly to simply run a cmd
2206
   */
2207
  @VisibleForTesting
2208
  static final class ExecutorHolder implements Executor {
2209
    private final ObjectPool<? extends Executor> pool;
2210
    private Executor executor;
2211

2212
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2213
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2214
    }
1✔
2215

2216
    synchronized Executor getExecutor() {
2217
      if (executor == null) {
1✔
2218
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2219
      }
2220
      return executor;
1✔
2221
    }
2222

2223
    synchronized void release() {
2224
      if (executor != null) {
1✔
2225
        executor = pool.returnObject(executor);
1✔
2226
      }
2227
    }
1✔
2228

2229
    @Override
2230
    public void execute(Runnable command) {
2231
      getExecutor().execute(command);
1✔
2232
    }
1✔
2233
  }
2234

2235
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2236
    final ScheduledExecutorService delegate;
2237

2238
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2239
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2240
    }
1✔
2241

2242
    @Override
2243
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2244
      return delegate.schedule(callable, delay, unit);
×
2245
    }
2246

2247
    @Override
2248
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2249
      return delegate.schedule(cmd, delay, unit);
1✔
2250
    }
2251

2252
    @Override
2253
    public ScheduledFuture<?> scheduleAtFixedRate(
2254
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2255
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
×
2256
    }
2257

2258
    @Override
2259
    public ScheduledFuture<?> scheduleWithFixedDelay(
2260
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2261
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2262
    }
2263

2264
    @Override
2265
    public boolean awaitTermination(long timeout, TimeUnit unit)
2266
        throws InterruptedException {
2267
      return delegate.awaitTermination(timeout, unit);
×
2268
    }
2269

2270
    @Override
2271
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2272
        throws InterruptedException {
2273
      return delegate.invokeAll(tasks);
×
2274
    }
2275

2276
    @Override
2277
    public <T> List<Future<T>> invokeAll(
2278
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2279
        throws InterruptedException {
2280
      return delegate.invokeAll(tasks, timeout, unit);
×
2281
    }
2282

2283
    @Override
2284
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2285
        throws InterruptedException, ExecutionException {
2286
      return delegate.invokeAny(tasks);
×
2287
    }
2288

2289
    @Override
2290
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2291
        throws InterruptedException, ExecutionException, TimeoutException {
2292
      return delegate.invokeAny(tasks, timeout, unit);
×
2293
    }
2294

2295
    @Override
2296
    public boolean isShutdown() {
2297
      return delegate.isShutdown();
×
2298
    }
2299

2300
    @Override
2301
    public boolean isTerminated() {
2302
      return delegate.isTerminated();
×
2303
    }
2304

2305
    @Override
2306
    public void shutdown() {
2307
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2308
    }
2309

2310
    @Override
2311
    public List<Runnable> shutdownNow() {
2312
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2313
    }
2314

2315
    @Override
2316
    public <T> Future<T> submit(Callable<T> task) {
2317
      return delegate.submit(task);
×
2318
    }
2319

2320
    @Override
2321
    public Future<?> submit(Runnable task) {
2322
      return delegate.submit(task);
×
2323
    }
2324

2325
    @Override
2326
    public <T> Future<T> submit(Runnable task, T result) {
2327
      return delegate.submit(task, result);
×
2328
    }
2329

2330
    @Override
2331
    public void execute(Runnable command) {
2332
      delegate.execute(command);
×
2333
    }
×
2334
  }
2335

2336
  /**
2337
   * A ResolutionState indicates the status of last name resolution.
2338
   */
2339
  enum ResolutionState {
1✔
2340
    NO_RESOLUTION,
1✔
2341
    SUCCESS,
1✔
2342
    ERROR
1✔
2343
  }
2344
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc