• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19182

01 May 2024 04:19PM UTC coverage: 88.315% (-0.01%) from 88.329%
#19182

push

github

ejona86
Plumb target to load balancer

gRFC A78 has WRR and pick-first include a `grpc.target` label, defined
in A66:

> `grpc.target` : Canonicalized target URI used when creating gRPC
> Channel, e.g. "dns:///pubsub.googleapis.com:443",
> "xds:///helloworld-gke:8000". Canonicalized target URI is the form
> with the scheme included if the user didn't mention the scheme
> (`scheme://[authority]/path`). For channels such as inprocess channels
> where a target URI is not available, implementations can synthesize a
> target URI.

31448 of 35609 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.44
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import io.grpc.Attributes;
36
import io.grpc.CallCredentials;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ChannelCredentials;
40
import io.grpc.ChannelLogger;
41
import io.grpc.ChannelLogger.ChannelLogLevel;
42
import io.grpc.ClientCall;
43
import io.grpc.ClientInterceptor;
44
import io.grpc.ClientInterceptors;
45
import io.grpc.ClientStreamTracer;
46
import io.grpc.ClientTransportFilter;
47
import io.grpc.CompressorRegistry;
48
import io.grpc.ConnectivityState;
49
import io.grpc.ConnectivityStateInfo;
50
import io.grpc.Context;
51
import io.grpc.Deadline;
52
import io.grpc.DecompressorRegistry;
53
import io.grpc.EquivalentAddressGroup;
54
import io.grpc.ForwardingChannelBuilder2;
55
import io.grpc.ForwardingClientCall;
56
import io.grpc.Grpc;
57
import io.grpc.InternalChannelz;
58
import io.grpc.InternalChannelz.ChannelStats;
59
import io.grpc.InternalChannelz.ChannelTrace;
60
import io.grpc.InternalConfigSelector;
61
import io.grpc.InternalInstrumented;
62
import io.grpc.InternalLogId;
63
import io.grpc.InternalWithLogId;
64
import io.grpc.LoadBalancer;
65
import io.grpc.LoadBalancer.CreateSubchannelArgs;
66
import io.grpc.LoadBalancer.PickResult;
67
import io.grpc.LoadBalancer.PickSubchannelArgs;
68
import io.grpc.LoadBalancer.ResolvedAddresses;
69
import io.grpc.LoadBalancer.SubchannelPicker;
70
import io.grpc.LoadBalancer.SubchannelStateListener;
71
import io.grpc.ManagedChannel;
72
import io.grpc.ManagedChannelBuilder;
73
import io.grpc.Metadata;
74
import io.grpc.MethodDescriptor;
75
import io.grpc.NameResolver;
76
import io.grpc.NameResolver.ConfigOrError;
77
import io.grpc.NameResolver.ResolutionResult;
78
import io.grpc.NameResolverProvider;
79
import io.grpc.NameResolverRegistry;
80
import io.grpc.ProxyDetector;
81
import io.grpc.Status;
82
import io.grpc.SynchronizationContext;
83
import io.grpc.SynchronizationContext.ScheduledHandle;
84
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
85
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
86
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
87
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
88
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
89
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
90
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
91
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
92
import io.grpc.internal.RetriableStream.Throttle;
93
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
94
import java.net.SocketAddress;
95
import java.net.URI;
96
import java.net.URISyntaxException;
97
import java.util.ArrayList;
98
import java.util.Collection;
99
import java.util.Collections;
100
import java.util.HashSet;
101
import java.util.LinkedHashSet;
102
import java.util.List;
103
import java.util.Map;
104
import java.util.Set;
105
import java.util.concurrent.Callable;
106
import java.util.concurrent.CountDownLatch;
107
import java.util.concurrent.ExecutionException;
108
import java.util.concurrent.Executor;
109
import java.util.concurrent.Future;
110
import java.util.concurrent.ScheduledExecutorService;
111
import java.util.concurrent.ScheduledFuture;
112
import java.util.concurrent.TimeUnit;
113
import java.util.concurrent.TimeoutException;
114
import java.util.concurrent.atomic.AtomicBoolean;
115
import java.util.concurrent.atomic.AtomicReference;
116
import java.util.logging.Level;
117
import java.util.logging.Logger;
118
import java.util.regex.Pattern;
119
import javax.annotation.Nullable;
120
import javax.annotation.concurrent.GuardedBy;
121
import javax.annotation.concurrent.ThreadSafe;
122

123
/** A communication channel for making outgoing RPCs. */
124
@ThreadSafe
125
final class ManagedChannelImpl extends ManagedChannel implements
126
    InternalInstrumented<ChannelStats> {
127
  @VisibleForTesting
128
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
129

130
  // Matching this pattern means the target string is a URI target or at least intended to be one.
131
  // A URI target must be an absolute hierarchical URI.
132
  // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
133
  @VisibleForTesting
134
  static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
1✔
135

136
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
137

138
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
139

140
  @VisibleForTesting
141
  static final Status SHUTDOWN_NOW_STATUS =
1✔
142
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
143

144
  @VisibleForTesting
145
  static final Status SHUTDOWN_STATUS =
1✔
146
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
147

148
  @VisibleForTesting
149
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
150
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
151

152
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
153
      ManagedChannelServiceConfig.empty();
1✔
154
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
155
      new InternalConfigSelector() {
1✔
156
        @Override
157
        public Result selectConfig(PickSubchannelArgs args) {
158
          throw new IllegalStateException("Resolution is pending");
×
159
        }
160
      };
161
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
162
      new LoadBalancer.PickDetailsConsumer() {};
1✔
163

164
  private final InternalLogId logId;
165
  private final String target;
166
  @Nullable
167
  private final String authorityOverride;
168
  private final NameResolverRegistry nameResolverRegistry;
169
  private final URI targetUri;
170
  private final NameResolverProvider nameResolverProvider;
171
  private final NameResolver.Args nameResolverArgs;
172
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
173
  private final ClientTransportFactory originalTransportFactory;
174
  @Nullable
175
  private final ChannelCredentials originalChannelCreds;
176
  private final ClientTransportFactory transportFactory;
177
  private final ClientTransportFactory oobTransportFactory;
178
  private final RestrictedScheduledExecutor scheduledExecutor;
179
  private final Executor executor;
180
  private final ObjectPool<? extends Executor> executorPool;
181
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
182
  private final ExecutorHolder balancerRpcExecutorHolder;
183
  private final ExecutorHolder offloadExecutorHolder;
184
  private final TimeProvider timeProvider;
185
  private final int maxTraceEvents;
186

187
  @VisibleForTesting
1✔
188
  final SynchronizationContext syncContext = new SynchronizationContext(
189
      new Thread.UncaughtExceptionHandler() {
1✔
190
        @Override
191
        public void uncaughtException(Thread t, Throwable e) {
192
          logger.log(
1✔
193
              Level.SEVERE,
194
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
195
              e);
196
          panic(e);
1✔
197
        }
1✔
198
      });
199

200
  private boolean fullStreamDecompression;
201

202
  private final DecompressorRegistry decompressorRegistry;
203
  private final CompressorRegistry compressorRegistry;
204

205
  private final Supplier<Stopwatch> stopwatchSupplier;
206
  /** The timout before entering idle mode. */
207
  private final long idleTimeoutMillis;
208

209
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
210
  private final BackoffPolicy.Provider backoffPolicyProvider;
211

212
  /**
213
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
214
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
215
   * {@link RealChannel}.
216
   */
217
  private final Channel interceptorChannel;
218

219
  private final List<ClientTransportFilter> transportFilters;
220
  @Nullable private final String userAgent;
221

222
  // Only null after channel is terminated. Must be assigned from the syncContext.
223
  private NameResolver nameResolver;
224

225
  // Must be accessed from the syncContext.
226
  private boolean nameResolverStarted;
227

228
  // null when channel is in idle mode.  Must be assigned from syncContext.
229
  @Nullable
230
  private LbHelperImpl lbHelper;
231

232
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
233
  // null if channel is in idle mode.
234
  @Nullable
235
  private volatile SubchannelPicker subchannelPicker;
236

237
  // Must be accessed from the syncContext
238
  private boolean panicMode;
239

240
  // Must be mutated from syncContext
241
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
242
  // switch to a ConcurrentHashMap.
243
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
244

245
  // Must be accessed from syncContext
246
  @Nullable
247
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
248
  private final Object pendingCallsInUseObject = new Object();
1✔
249

250
  // Must be mutated from syncContext
251
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
252

253
  // reprocess() must be run from syncContext
254
  private final DelayedClientTransport delayedTransport;
255
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
256
      = new UncommittedRetriableStreamsRegistry();
257

258
  // Shutdown states.
259
  //
260
  // Channel's shutdown process:
261
  // 1. shutdown(): stop accepting new calls from applications
262
  //   1a shutdown <- true
263
  //   1b subchannelPicker <- null
264
  //   1c delayedTransport.shutdown()
265
  // 2. delayedTransport terminated: stop stream-creation functionality
266
  //   2a terminating <- true
267
  //   2b loadBalancer.shutdown()
268
  //     * LoadBalancer will shutdown subchannels and OOB channels
269
  //   2c loadBalancer <- null
270
  //   2d nameResolver.shutdown()
271
  //   2e nameResolver <- null
272
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
273

274
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
275
  // Must only be mutated and read from syncContext
276
  private boolean shutdownNowed;
277
  // Must only be mutated from syncContext
278
  private boolean terminating;
279
  // Must be mutated from syncContext
280
  private volatile boolean terminated;
281
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
282

283
  private final CallTracer.Factory callTracerFactory;
284
  private final CallTracer channelCallTracer;
285
  private final ChannelTracer channelTracer;
286
  private final ChannelLogger channelLogger;
287
  private final InternalChannelz channelz;
288
  private final RealChannel realChannel;
289
  // Must be mutated and read from syncContext
290
  // a flag for doing channel tracing when flipped
291
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
292
  // Must be mutated and read from constructor or syncContext
293
  // used for channel tracing when value changed
294
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
295

296
  @Nullable
297
  private final ManagedChannelServiceConfig defaultServiceConfig;
298
  // Must be mutated and read from constructor or syncContext
299
  private boolean serviceConfigUpdated = false;
1✔
300
  private final boolean lookUpServiceConfig;
301

302
  // One instance per channel.
303
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
304

305
  private final long perRpcBufferLimit;
306
  private final long channelBufferLimit;
307

308
  // Temporary false flag that can skip the retry code path.
309
  private final boolean retryEnabled;
310

311
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
312

313
  // Called from syncContext
314
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
315
      new DelayedTransportListener();
316

317
  // Must be called from syncContext
318
  private void maybeShutdownNowSubchannels() {
319
    if (shutdownNowed) {
1✔
320
      for (InternalSubchannel subchannel : subchannels) {
1✔
321
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
322
      }
1✔
323
      for (OobChannel oobChannel : oobChannels) {
1✔
324
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
325
      }
1✔
326
    }
327
  }
1✔
328

329
  // Must be accessed from syncContext
330
  @VisibleForTesting
1✔
331
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
332

333
  @Override
334
  public ListenableFuture<ChannelStats> getStats() {
335
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
336
    final class StatsFetcher implements Runnable {
1✔
337
      @Override
338
      public void run() {
339
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
340
        channelCallTracer.updateBuilder(builder);
1✔
341
        channelTracer.updateBuilder(builder);
1✔
342
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
343
        List<InternalWithLogId> children = new ArrayList<>();
1✔
344
        children.addAll(subchannels);
1✔
345
        children.addAll(oobChannels);
1✔
346
        builder.setSubchannels(children);
1✔
347
        ret.set(builder.build());
1✔
348
      }
1✔
349
    }
350

351
    // subchannels and oobchannels can only be accessed from syncContext
352
    syncContext.execute(new StatsFetcher());
1✔
353
    return ret;
1✔
354
  }
355

356
  @Override
357
  public InternalLogId getLogId() {
358
    return logId;
1✔
359
  }
360

361
  // Run from syncContext
362
  private class IdleModeTimer implements Runnable {
1✔
363

364
    @Override
365
    public void run() {
366
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
367
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
368
      // subtle to change rapidly to resolve the channel panic. See #8714
369
      if (lbHelper == null) {
1✔
370
        return;
×
371
      }
372
      enterIdleMode();
1✔
373
    }
1✔
374
  }
375

376
  // Must be called from syncContext
377
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
378
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
379
    if (channelIsActive) {
1✔
380
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
381
      checkState(lbHelper != null, "lbHelper is null");
1✔
382
    }
383
    if (nameResolver != null) {
1✔
384
      nameResolver.shutdown();
1✔
385
      nameResolverStarted = false;
1✔
386
      if (channelIsActive) {
1✔
387
        nameResolver = getNameResolver(
1✔
388
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
389
      } else {
390
        nameResolver = null;
1✔
391
      }
392
    }
393
    if (lbHelper != null) {
1✔
394
      lbHelper.lb.shutdown();
1✔
395
      lbHelper = null;
1✔
396
    }
397
    subchannelPicker = null;
1✔
398
  }
1✔
399

400
  /**
401
   * Make the channel exit idle mode, if it's in it.
402
   *
403
   * <p>Must be called from syncContext
404
   */
405
  @VisibleForTesting
406
  void exitIdleMode() {
407
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
408
    if (shutdown.get() || panicMode) {
1✔
409
      return;
1✔
410
    }
411
    if (inUseStateAggregator.isInUse()) {
1✔
412
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
413
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
414
      cancelIdleTimer(false);
1✔
415
    } else {
416
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
417
      // isInUse() == false, in which case we still need to schedule the timer.
418
      rescheduleIdleTimer();
1✔
419
    }
420
    if (lbHelper != null) {
1✔
421
      return;
1✔
422
    }
423
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
424
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
425
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
426
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
427
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
428
    this.lbHelper = lbHelper;
1✔
429

430
    channelStateManager.gotoState(CONNECTING);
1✔
431
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
432
    nameResolver.start(listener);
1✔
433
    nameResolverStarted = true;
1✔
434
  }
1✔
435

436
  // Must be run from syncContext
437
  private void enterIdleMode() {
438
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
439
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
440
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
441
    // which are bugs.
442
    shutdownNameResolverAndLoadBalancer(true);
1✔
443
    delayedTransport.reprocess(null);
1✔
444
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
445
    channelStateManager.gotoState(IDLE);
1✔
446
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
447
    // transport to be holding some we need to exit idle mode to give these calls a chance to
448
    // be processed.
449
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
450
      exitIdleMode();
1✔
451
    }
452
  }
1✔
453

454
  // Must be run from syncContext
455
  private void cancelIdleTimer(boolean permanent) {
456
    idleTimer.cancel(permanent);
1✔
457
  }
1✔
458

459
  // Always run from syncContext
460
  private void rescheduleIdleTimer() {
461
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
462
      return;
1✔
463
    }
464
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
465
  }
1✔
466

467
  /**
468
   * Force name resolution refresh to happen immediately. Must be run
469
   * from syncContext.
470
   */
471
  private void refreshNameResolution() {
472
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
473
    if (nameResolverStarted) {
1✔
474
      nameResolver.refresh();
1✔
475
    }
476
  }
1✔
477

478
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
479
    volatile Throttle throttle;
480

481
    private ClientTransport getTransport(PickSubchannelArgs args) {
482
      SubchannelPicker pickerCopy = subchannelPicker;
1✔
483
      if (shutdown.get()) {
1✔
484
        // If channel is shut down, delayedTransport is also shut down which will fail the stream
485
        // properly.
486
        return delayedTransport;
1✔
487
      }
488
      if (pickerCopy == null) {
1✔
489
        final class ExitIdleModeForTransport implements Runnable {
1✔
490
          @Override
491
          public void run() {
492
            exitIdleMode();
1✔
493
          }
1✔
494
        }
495

496
        syncContext.execute(new ExitIdleModeForTransport());
1✔
497
        return delayedTransport;
1✔
498
      }
499
      // There is no need to reschedule the idle timer here.
500
      //
501
      // pickerCopy != null, which means idle timer has not expired when this method starts.
502
      // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
503
      // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
504
      // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
505
      //
506
      // In most cases the idle timer is scheduled to fire after the transport has created the
507
      // stream, which would have reported in-use state to the channel that would have cancelled
508
      // the idle timer.
509
      PickResult pickResult = pickerCopy.pickSubchannel(args);
1✔
510
      ClientTransport transport = GrpcUtil.getTransportFromPickResult(
1✔
511
          pickResult, args.getCallOptions().isWaitForReady());
1✔
512
      if (transport != null) {
1✔
513
        return transport;
1✔
514
      }
515
      return delayedTransport;
1✔
516
    }
517

518
    @Override
519
    public ClientStream newStream(
520
        final MethodDescriptor<?, ?> method,
521
        final CallOptions callOptions,
522
        final Metadata headers,
523
        final Context context) {
524
      if (!retryEnabled) {
1✔
525
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
526
            callOptions, headers, 0, /* isTransparentRetry= */ false);
527
        ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
1✔
528
            method, headers, callOptions, new PickDetailsConsumerImpl(tracers)));
529
        Context origContext = context.attach();
1✔
530
        try {
531
          return transport.newStream(method, headers, callOptions, tracers);
1✔
532
        } finally {
533
          context.detach(origContext);
1✔
534
        }
535
      } else {
536
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
537
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
538
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
539
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
540
          @SuppressWarnings("unchecked")
541
          RetryStream() {
1✔
542
            super(
1✔
543
                (MethodDescriptor<ReqT, ?>) method,
544
                headers,
545
                channelBufferUsed,
1✔
546
                perRpcBufferLimit,
1✔
547
                channelBufferLimit,
1✔
548
                getCallExecutor(callOptions),
1✔
549
                transportFactory.getScheduledExecutorService(),
1✔
550
                retryPolicy,
551
                hedgingPolicy,
552
                throttle);
553
          }
1✔
554

555
          @Override
556
          Status prestart() {
557
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
558
          }
559

560
          @Override
561
          void postCommit() {
562
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
563
          }
1✔
564

565
          @Override
566
          ClientStream newSubstream(
567
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
568
              boolean isTransparentRetry) {
569
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
570
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
571
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
572
            ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
1✔
573
                method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers)));
574
            Context origContext = context.attach();
1✔
575
            try {
576
              return transport.newStream(method, newHeaders, newOptions, tracers);
1✔
577
            } finally {
578
              context.detach(origContext);
1✔
579
            }
580
          }
581
        }
582

583
        return new RetryStream<>();
1✔
584
      }
585
    }
586
  }
587

588
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
589

590
  private final Rescheduler idleTimer;
591

592
  ManagedChannelImpl(
593
      ManagedChannelImplBuilder builder,
594
      ClientTransportFactory clientTransportFactory,
595
      BackoffPolicy.Provider backoffPolicyProvider,
596
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
597
      Supplier<Stopwatch> stopwatchSupplier,
598
      List<ClientInterceptor> interceptors,
599
      final TimeProvider timeProvider) {
1✔
600
    this.target = checkNotNull(builder.target, "target");
1✔
601
    this.logId = InternalLogId.allocate("Channel", target);
1✔
602
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
603
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
604
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
605
    this.originalChannelCreds = builder.channelCredentials;
1✔
606
    this.originalTransportFactory = clientTransportFactory;
1✔
607
    this.offloadExecutorHolder =
1✔
608
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
609
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
610
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
611
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
612
        clientTransportFactory, null, this.offloadExecutorHolder);
613
    this.scheduledExecutor =
1✔
614
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
615
    maxTraceEvents = builder.maxTraceEvents;
1✔
616
    channelTracer = new ChannelTracer(
1✔
617
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
618
        "Channel for '" + target + "'");
619
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
620
    ProxyDetector proxyDetector =
621
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
622
    this.retryEnabled = builder.retryEnabled;
1✔
623
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
624
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
625
    ResolvedNameResolver resolvedResolver = getNameResolverProvider(
1✔
626
        target, nameResolverRegistry, transportFactory.getSupportedSocketAddressTypes());
1✔
627
    this.targetUri = resolvedResolver.targetUri;
1✔
628
    this.nameResolverProvider = resolvedResolver.provider;
1✔
629
    ScParser serviceConfigParser =
1✔
630
        new ScParser(
631
            retryEnabled,
632
            builder.maxRetryAttempts,
633
            builder.maxHedgedAttempts,
634
            loadBalancerFactory);
635
    this.authorityOverride = builder.authorityOverride;
1✔
636
    this.nameResolverArgs =
1✔
637
        NameResolver.Args.newBuilder()
1✔
638
            .setDefaultPort(builder.getDefaultPort())
1✔
639
            .setProxyDetector(proxyDetector)
1✔
640
            .setSynchronizationContext(syncContext)
1✔
641
            .setScheduledExecutorService(scheduledExecutor)
1✔
642
            .setServiceConfigParser(serviceConfigParser)
1✔
643
            .setChannelLogger(channelLogger)
1✔
644
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
645
            .setOverrideAuthority(this.authorityOverride)
1✔
646
            .build();
1✔
647
    this.nameResolver = getNameResolver(
1✔
648
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
649
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
650
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
651
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
652
    this.delayedTransport.start(delayedTransportListener);
1✔
653
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
654

655
    if (builder.defaultServiceConfig != null) {
1✔
656
      ConfigOrError parsedDefaultServiceConfig =
1✔
657
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
658
      checkState(
1✔
659
          parsedDefaultServiceConfig.getError() == null,
1✔
660
          "Default config is invalid: %s",
661
          parsedDefaultServiceConfig.getError());
1✔
662
      this.defaultServiceConfig =
1✔
663
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
664
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
665
    } else {
1✔
666
      this.defaultServiceConfig = null;
1✔
667
    }
668
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
669
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
670
    Channel channel = realChannel;
1✔
671
    if (builder.binlog != null) {
1✔
672
      channel = builder.binlog.wrapChannel(channel);
1✔
673
    }
674
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
675
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
676
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
677
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
678
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
679
    } else {
680
      checkArgument(
1✔
681
          builder.idleTimeoutMillis
682
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
683
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
684
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
685
    }
686

687
    idleTimer = new Rescheduler(
1✔
688
        new IdleModeTimer(),
689
        syncContext,
690
        transportFactory.getScheduledExecutorService(),
1✔
691
        stopwatchSupplier.get());
1✔
692
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
693
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
694
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
695
    this.userAgent = builder.userAgent;
1✔
696

697
    this.channelBufferLimit = builder.retryBufferSize;
1✔
698
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
699
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
700
      @Override
701
      public CallTracer create() {
702
        return new CallTracer(timeProvider);
1✔
703
      }
704
    }
705

706
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
707
    channelCallTracer = callTracerFactory.create();
1✔
708
    this.channelz = checkNotNull(builder.channelz);
1✔
709
    channelz.addRootChannel(this);
1✔
710

711
    if (!lookUpServiceConfig) {
1✔
712
      if (defaultServiceConfig != null) {
1✔
713
        channelLogger.log(
1✔
714
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
715
      }
716
      serviceConfigUpdated = true;
1✔
717
    }
718
  }
1✔
719

720
  @VisibleForTesting
721
  static class ResolvedNameResolver {
722
    public final URI targetUri;
723
    public final NameResolverProvider provider;
724

725
    public ResolvedNameResolver(URI targetUri, NameResolverProvider provider) {
1✔
726
      this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
727
      this.provider = checkNotNull(provider, "provider");
1✔
728
    }
1✔
729
  }
730

731
  @VisibleForTesting
732
  static ResolvedNameResolver getNameResolverProvider(
733
      String target, NameResolverRegistry nameResolverRegistry,
734
      Collection<Class<? extends SocketAddress>> channelTransportSocketAddressTypes) {
735
    // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
736
    // "dns:///".
737
    NameResolverProvider provider = null;
1✔
738
    URI targetUri = null;
1✔
739
    StringBuilder uriSyntaxErrors = new StringBuilder();
1✔
740
    try {
741
      targetUri = new URI(target);
1✔
742
    } catch (URISyntaxException e) {
1✔
743
      // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
744
      uriSyntaxErrors.append(e.getMessage());
1✔
745
    }
1✔
746
    if (targetUri != null) {
1✔
747
      // For "localhost:8080" this would likely cause provider to be null, because "localhost" is
748
      // parsed as the scheme. Will hit the next case and try "dns:///localhost:8080".
749
      provider = nameResolverRegistry.getProviderForScheme(targetUri.getScheme());
1✔
750
    }
751

752
    if (provider == null && !URI_PATTERN.matcher(target).matches()) {
1✔
753
      // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
754
      // scheme from the registry.
755
      try {
756
        targetUri = new URI(nameResolverRegistry.getDefaultScheme(), "", "/" + target, null);
1✔
757
      } catch (URISyntaxException e) {
×
758
        // Should not be possible.
759
        throw new IllegalArgumentException(e);
×
760
      }
1✔
761
      provider = nameResolverRegistry.getProviderForScheme(targetUri.getScheme());
1✔
762
    }
763

764
    if (provider == null) {
1✔
765
      throw new IllegalArgumentException(String.format(
1✔
766
          "Could not find a NameResolverProvider for %s%s",
767
          target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
1✔
768
    }
769

770
    if (channelTransportSocketAddressTypes != null) {
1✔
771
      Collection<Class<? extends SocketAddress>> nameResolverSocketAddressTypes
1✔
772
          = provider.getProducedSocketAddressTypes();
1✔
773
      if (!channelTransportSocketAddressTypes.containsAll(nameResolverSocketAddressTypes)) {
1✔
774
        throw new IllegalArgumentException(String.format(
1✔
775
            "Address types of NameResolver '%s' for '%s' not supported by transport",
776
            targetUri.getScheme(), target));
1✔
777
      }
778
    }
779

780
    return new ResolvedNameResolver(targetUri, provider);
1✔
781
  }
782

783
  @VisibleForTesting
784
  static NameResolver getNameResolver(
785
      URI targetUri, @Nullable final String overrideAuthority,
786
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
787
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
788
    if (resolver == null) {
1✔
789
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
790
    }
791

792
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
793
    // TODO: After a transition period, all NameResolver implementations that need retry should use
794
    //       RetryingNameResolver directly and this step can be removed.
795
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
796
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
797
              nameResolverArgs.getScheduledExecutorService(),
1✔
798
              nameResolverArgs.getSynchronizationContext()),
1✔
799
          nameResolverArgs.getSynchronizationContext());
1✔
800

801
    if (overrideAuthority == null) {
1✔
802
      return usedNameResolver;
1✔
803
    }
804

805
    return new ForwardingNameResolver(usedNameResolver) {
1✔
806
      @Override
807
      public String getServiceAuthority() {
808
        return overrideAuthority;
1✔
809
      }
810
    };
811
  }
812

813
  @VisibleForTesting
814
  InternalConfigSelector getConfigSelector() {
815
    return realChannel.configSelector.get();
1✔
816
  }
817

818
  /**
819
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
820
   * cancelled.
821
   */
822
  @Override
823
  public ManagedChannelImpl shutdown() {
824
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
825
    if (!shutdown.compareAndSet(false, true)) {
1✔
826
      return this;
1✔
827
    }
828
    final class Shutdown implements Runnable {
1✔
829
      @Override
830
      public void run() {
831
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
832
        channelStateManager.gotoState(SHUTDOWN);
1✔
833
      }
1✔
834
    }
835

836
    syncContext.execute(new Shutdown());
1✔
837
    realChannel.shutdown();
1✔
838
    final class CancelIdleTimer implements Runnable {
1✔
839
      @Override
840
      public void run() {
841
        cancelIdleTimer(/* permanent= */ true);
1✔
842
      }
1✔
843
    }
844

845
    syncContext.execute(new CancelIdleTimer());
1✔
846
    return this;
1✔
847
  }
848

849
  /**
850
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
851
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
852
   * return {@code false} immediately after this method returns.
853
   */
854
  @Override
855
  public ManagedChannelImpl shutdownNow() {
856
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
857
    shutdown();
1✔
858
    realChannel.shutdownNow();
1✔
859
    final class ShutdownNow implements Runnable {
1✔
860
      @Override
861
      public void run() {
862
        if (shutdownNowed) {
1✔
863
          return;
1✔
864
        }
865
        shutdownNowed = true;
1✔
866
        maybeShutdownNowSubchannels();
1✔
867
      }
1✔
868
    }
869

870
    syncContext.execute(new ShutdownNow());
1✔
871
    return this;
1✔
872
  }
873

874
  // Called from syncContext
875
  @VisibleForTesting
876
  void panic(final Throwable t) {
877
    if (panicMode) {
1✔
878
      // Preserve the first panic information
879
      return;
×
880
    }
881
    panicMode = true;
1✔
882
    cancelIdleTimer(/* permanent= */ true);
1✔
883
    shutdownNameResolverAndLoadBalancer(false);
1✔
884
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
885
      private final PickResult panicPickResult =
1✔
886
          PickResult.withDrop(
1✔
887
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
888

889
      @Override
890
      public PickResult pickSubchannel(PickSubchannelArgs args) {
891
        return panicPickResult;
1✔
892
      }
893

894
      @Override
895
      public String toString() {
896
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
897
            .add("panicPickResult", panicPickResult)
×
898
            .toString();
×
899
      }
900
    }
901

902
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
903
    realChannel.updateConfigSelector(null);
1✔
904
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
905
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
906
  }
1✔
907

908
  @VisibleForTesting
909
  boolean isInPanicMode() {
910
    return panicMode;
1✔
911
  }
912

913
  // Called from syncContext
914
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
915
    subchannelPicker = newPicker;
1✔
916
    delayedTransport.reprocess(newPicker);
1✔
917
  }
1✔
918

919
  @Override
920
  public boolean isShutdown() {
921
    return shutdown.get();
1✔
922
  }
923

924
  @Override
925
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
926
    return terminatedLatch.await(timeout, unit);
1✔
927
  }
928

929
  @Override
930
  public boolean isTerminated() {
931
    return terminated;
1✔
932
  }
933

934
  /*
935
   * Creates a new outgoing call on the channel.
936
   */
937
  @Override
938
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
939
      CallOptions callOptions) {
940
    return interceptorChannel.newCall(method, callOptions);
1✔
941
  }
942

943
  @Override
944
  public String authority() {
945
    return interceptorChannel.authority();
1✔
946
  }
947

948
  private Executor getCallExecutor(CallOptions callOptions) {
949
    Executor executor = callOptions.getExecutor();
1✔
950
    if (executor == null) {
1✔
951
      executor = this.executor;
1✔
952
    }
953
    return executor;
1✔
954
  }
955

956
  private class RealChannel extends Channel {
957
    // Reference to null if no config selector is available from resolution result
958
    // Reference must be set() from syncContext
959
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
960
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
961
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
962
    // same target, the new instance must have the same value.
963
    private final String authority;
964

965
    private final Channel clientCallImplChannel = new Channel() {
1✔
966
      @Override
967
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
968
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
969
        return new ClientCallImpl<>(
1✔
970
            method,
971
            getCallExecutor(callOptions),
1✔
972
            callOptions,
973
            transportProvider,
1✔
974
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
975
            channelCallTracer,
1✔
976
            null)
977
            .setFullStreamDecompression(fullStreamDecompression)
1✔
978
            .setDecompressorRegistry(decompressorRegistry)
1✔
979
            .setCompressorRegistry(compressorRegistry);
1✔
980
      }
981

982
      @Override
983
      public String authority() {
984
        return authority;
×
985
      }
986
    };
987

988
    private RealChannel(String authority) {
1✔
989
      this.authority =  checkNotNull(authority, "authority");
1✔
990
    }
1✔
991

992
    @Override
993
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
994
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
995
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
996
        return newClientCall(method, callOptions);
1✔
997
      }
998
      syncContext.execute(new Runnable() {
1✔
999
        @Override
1000
        public void run() {
1001
          exitIdleMode();
1✔
1002
        }
1✔
1003
      });
1004
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
1005
        // This is an optimization for the case (typically with InProcessTransport) when name
1006
        // resolution result is immediately available at this point. Otherwise, some users'
1007
        // tests might observe slight behavior difference from earlier grpc versions.
1008
        return newClientCall(method, callOptions);
1✔
1009
      }
1010
      if (shutdown.get()) {
1✔
1011
        // Return a failing ClientCall.
1012
        return new ClientCall<ReqT, RespT>() {
×
1013
          @Override
1014
          public void start(Listener<RespT> responseListener, Metadata headers) {
1015
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
1016
          }
×
1017

1018
          @Override public void request(int numMessages) {}
×
1019

1020
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
1021

1022
          @Override public void halfClose() {}
×
1023

1024
          @Override public void sendMessage(ReqT message) {}
×
1025
        };
1026
      }
1027
      Context context = Context.current();
1✔
1028
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
1029
      syncContext.execute(new Runnable() {
1✔
1030
        @Override
1031
        public void run() {
1032
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1033
            if (pendingCalls == null) {
1✔
1034
              pendingCalls = new LinkedHashSet<>();
1✔
1035
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
1036
            }
1037
            pendingCalls.add(pendingCall);
1✔
1038
          } else {
1039
            pendingCall.reprocess();
1✔
1040
          }
1041
        }
1✔
1042
      });
1043
      return pendingCall;
1✔
1044
    }
1045

1046
    // Must run in SynchronizationContext.
1047
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
1048
      InternalConfigSelector prevConfig = configSelector.get();
1✔
1049
      configSelector.set(config);
1✔
1050
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
1051
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1052
          pendingCall.reprocess();
1✔
1053
        }
1✔
1054
      }
1055
    }
1✔
1056

1057
    // Must run in SynchronizationContext.
1058
    void onConfigError() {
1059
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1060
        updateConfigSelector(null);
1✔
1061
      }
1062
    }
1✔
1063

1064
    void shutdown() {
1065
      final class RealChannelShutdown implements Runnable {
1✔
1066
        @Override
1067
        public void run() {
1068
          if (pendingCalls == null) {
1✔
1069
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1070
              configSelector.set(null);
1✔
1071
            }
1072
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1073
          }
1074
        }
1✔
1075
      }
1076

1077
      syncContext.execute(new RealChannelShutdown());
1✔
1078
    }
1✔
1079

1080
    void shutdownNow() {
1081
      final class RealChannelShutdownNow implements Runnable {
1✔
1082
        @Override
1083
        public void run() {
1084
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1085
            configSelector.set(null);
1✔
1086
          }
1087
          if (pendingCalls != null) {
1✔
1088
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1089
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
1090
            }
1✔
1091
          }
1092
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
1093
        }
1✔
1094
      }
1095

1096
      syncContext.execute(new RealChannelShutdownNow());
1✔
1097
    }
1✔
1098

1099
    @Override
1100
    public String authority() {
1101
      return authority;
1✔
1102
    }
1103

1104
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1105
      final Context context;
1106
      final MethodDescriptor<ReqT, RespT> method;
1107
      final CallOptions callOptions;
1108
      private final long callCreationTime;
1109

1110
      PendingCall(
1111
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1112
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1113
        this.context = context;
1✔
1114
        this.method = method;
1✔
1115
        this.callOptions = callOptions;
1✔
1116
        this.callCreationTime = ticker.nanoTime();
1✔
1117
      }
1✔
1118

1119
      /** Called when it's ready to create a real call and reprocess the pending call. */
1120
      void reprocess() {
1121
        ClientCall<ReqT, RespT> realCall;
1122
        Context previous = context.attach();
1✔
1123
        try {
1124
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1125
              ticker.nanoTime() - callCreationTime);
1✔
1126
          realCall = newClientCall(method, delayResolutionOption);
1✔
1127
        } finally {
1128
          context.detach(previous);
1✔
1129
        }
1130
        Runnable toRun = setCall(realCall);
1✔
1131
        if (toRun == null) {
1✔
1132
          syncContext.execute(new PendingCallRemoval());
1✔
1133
        } else {
1134
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1135
            @Override
1136
            public void run() {
1137
              toRun.run();
1✔
1138
              syncContext.execute(new PendingCallRemoval());
1✔
1139
            }
1✔
1140
          });
1141
        }
1142
      }
1✔
1143

1144
      @Override
1145
      protected void callCancelled() {
1146
        super.callCancelled();
1✔
1147
        syncContext.execute(new PendingCallRemoval());
1✔
1148
      }
1✔
1149

1150
      final class PendingCallRemoval implements Runnable {
1✔
1151
        @Override
1152
        public void run() {
1153
          if (pendingCalls != null) {
1✔
1154
            pendingCalls.remove(PendingCall.this);
1✔
1155
            if (pendingCalls.isEmpty()) {
1✔
1156
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1157
              pendingCalls = null;
1✔
1158
              if (shutdown.get()) {
1✔
1159
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1160
              }
1161
            }
1162
          }
1163
        }
1✔
1164
      }
1165
    }
1166

1167
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1168
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1169
      InternalConfigSelector selector = configSelector.get();
1✔
1170
      if (selector == null) {
1✔
1171
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1172
      }
1173
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1174
        MethodInfo methodInfo =
1✔
1175
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1176
        if (methodInfo != null) {
1✔
1177
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1178
        }
1179
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1180
      }
1181
      return new ConfigSelectingClientCall<>(
1✔
1182
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1183
    }
1184
  }
1185

1186
  /**
1187
   * A client call for a given channel that applies a given config selector when it starts.
1188
   */
1189
  static final class ConfigSelectingClientCall<ReqT, RespT>
1190
      extends ForwardingClientCall<ReqT, RespT> {
1191

1192
    private final InternalConfigSelector configSelector;
1193
    private final Channel channel;
1194
    private final Executor callExecutor;
1195
    private final MethodDescriptor<ReqT, RespT> method;
1196
    private final Context context;
1197
    private CallOptions callOptions;
1198

1199
    private ClientCall<ReqT, RespT> delegate;
1200

1201
    ConfigSelectingClientCall(
1202
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1203
        MethodDescriptor<ReqT, RespT> method,
1204
        CallOptions callOptions) {
1✔
1205
      this.configSelector = configSelector;
1✔
1206
      this.channel = channel;
1✔
1207
      this.method = method;
1✔
1208
      this.callExecutor =
1✔
1209
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1210
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1211
      this.context = Context.current();
1✔
1212
    }
1✔
1213

1214
    @Override
1215
    protected ClientCall<ReqT, RespT> delegate() {
1216
      return delegate;
1✔
1217
    }
1218

1219
    @SuppressWarnings("unchecked")
1220
    @Override
1221
    public void start(Listener<RespT> observer, Metadata headers) {
1222
      PickSubchannelArgs args =
1✔
1223
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1224
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1225
      Status status = result.getStatus();
1✔
1226
      if (!status.isOk()) {
1✔
1227
        executeCloseObserverInContext(observer,
1✔
1228
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1229
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1230
        return;
1✔
1231
      }
1232
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1233
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1234
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1235
      if (methodInfo != null) {
1✔
1236
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1237
      }
1238
      if (interceptor != null) {
1✔
1239
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1240
      } else {
1241
        delegate = channel.newCall(method, callOptions);
×
1242
      }
1243
      delegate.start(observer, headers);
1✔
1244
    }
1✔
1245

1246
    private void executeCloseObserverInContext(
1247
        final Listener<RespT> observer, final Status status) {
1248
      class CloseInContext extends ContextRunnable {
1249
        CloseInContext() {
1✔
1250
          super(context);
1✔
1251
        }
1✔
1252

1253
        @Override
1254
        public void runInContext() {
1255
          observer.onClose(status, new Metadata());
1✔
1256
        }
1✔
1257
      }
1258

1259
      callExecutor.execute(new CloseInContext());
1✔
1260
    }
1✔
1261

1262
    @Override
1263
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1264
      if (delegate != null) {
×
1265
        delegate.cancel(message, cause);
×
1266
      }
1267
    }
×
1268
  }
1269

1270
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1271
    @Override
1272
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1273

1274
    @Override
1275
    public void request(int numMessages) {}
1✔
1276

1277
    @Override
1278
    public void cancel(String message, Throwable cause) {}
×
1279

1280
    @Override
1281
    public void halfClose() {}
×
1282

1283
    @Override
1284
    public void sendMessage(Object message) {}
×
1285

1286
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1287
    @Override
1288
    public boolean isReady() {
1289
      return false;
×
1290
    }
1291
  };
1292

1293
  /**
1294
   * Terminate the channel if termination conditions are met.
1295
   */
1296
  // Must be run from syncContext
1297
  private void maybeTerminateChannel() {
1298
    if (terminated) {
1✔
1299
      return;
×
1300
    }
1301
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1302
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1303
      channelz.removeRootChannel(this);
1✔
1304
      executorPool.returnObject(executor);
1✔
1305
      balancerRpcExecutorHolder.release();
1✔
1306
      offloadExecutorHolder.release();
1✔
1307
      // Release the transport factory so that it can deallocate any resources.
1308
      transportFactory.close();
1✔
1309

1310
      terminated = true;
1✔
1311
      terminatedLatch.countDown();
1✔
1312
    }
1313
  }
1✔
1314

1315
  // Must be called from syncContext
1316
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1317
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1318
      refreshNameResolution();
1✔
1319
    }
1320
  }
1✔
1321

1322
  @Override
1323
  @SuppressWarnings("deprecation")
1324
  public ConnectivityState getState(boolean requestConnection) {
1325
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1326
    if (requestConnection && savedChannelState == IDLE) {
1✔
1327
      final class RequestConnection implements Runnable {
1✔
1328
        @Override
1329
        public void run() {
1330
          exitIdleMode();
1✔
1331
          if (subchannelPicker != null) {
1✔
1332
            subchannelPicker.requestConnection();
1✔
1333
          }
1334
          if (lbHelper != null) {
1✔
1335
            lbHelper.lb.requestConnection();
1✔
1336
          }
1337
        }
1✔
1338
      }
1339

1340
      syncContext.execute(new RequestConnection());
1✔
1341
    }
1342
    return savedChannelState;
1✔
1343
  }
1344

1345
  @Override
1346
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1347
    final class NotifyStateChanged implements Runnable {
1✔
1348
      @Override
1349
      public void run() {
1350
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1351
      }
1✔
1352
    }
1353

1354
    syncContext.execute(new NotifyStateChanged());
1✔
1355
  }
1✔
1356

1357
  @Override
1358
  public void resetConnectBackoff() {
1359
    final class ResetConnectBackoff implements Runnable {
1✔
1360
      @Override
1361
      public void run() {
1362
        if (shutdown.get()) {
1✔
1363
          return;
1✔
1364
        }
1365
        if (nameResolverStarted) {
1✔
1366
          refreshNameResolution();
1✔
1367
        }
1368
        for (InternalSubchannel subchannel : subchannels) {
1✔
1369
          subchannel.resetConnectBackoff();
1✔
1370
        }
1✔
1371
        for (OobChannel oobChannel : oobChannels) {
1✔
1372
          oobChannel.resetConnectBackoff();
×
1373
        }
×
1374
      }
1✔
1375
    }
1376

1377
    syncContext.execute(new ResetConnectBackoff());
1✔
1378
  }
1✔
1379

1380
  @Override
1381
  public void enterIdle() {
1382
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1383
      @Override
1384
      public void run() {
1385
        if (shutdown.get() || lbHelper == null) {
1✔
1386
          return;
1✔
1387
        }
1388
        cancelIdleTimer(/* permanent= */ false);
1✔
1389
        enterIdleMode();
1✔
1390
      }
1✔
1391
    }
1392

1393
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1394
  }
1✔
1395

1396
  /**
1397
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1398
   * backoff.
1399
   */
1400
  private final class UncommittedRetriableStreamsRegistry {
1✔
1401
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1402
    // it's worthwhile to look for a lock-free approach.
1403
    final Object lock = new Object();
1✔
1404

1405
    @GuardedBy("lock")
1✔
1406
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1407

1408
    @GuardedBy("lock")
1409
    Status shutdownStatus;
1410

1411
    void onShutdown(Status reason) {
1412
      boolean shouldShutdownDelayedTransport = false;
1✔
1413
      synchronized (lock) {
1✔
1414
        if (shutdownStatus != null) {
1✔
1415
          return;
1✔
1416
        }
1417
        shutdownStatus = reason;
1✔
1418
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1419
        // retriable streams, which may be in backoff and not using any transport, are already
1420
        // started RPCs.
1421
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1422
          shouldShutdownDelayedTransport = true;
1✔
1423
        }
1424
      }
1✔
1425

1426
      if (shouldShutdownDelayedTransport) {
1✔
1427
        delayedTransport.shutdown(reason);
1✔
1428
      }
1429
    }
1✔
1430

1431
    void onShutdownNow(Status reason) {
1432
      onShutdown(reason);
1✔
1433
      Collection<ClientStream> streams;
1434

1435
      synchronized (lock) {
1✔
1436
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1437
      }
1✔
1438

1439
      for (ClientStream stream : streams) {
1✔
1440
        stream.cancel(reason);
1✔
1441
      }
1✔
1442
      delayedTransport.shutdownNow(reason);
1✔
1443
    }
1✔
1444

1445
    /**
1446
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1447
     * shutdown Status.
1448
     */
1449
    @Nullable
1450
    Status add(RetriableStream<?> retriableStream) {
1451
      synchronized (lock) {
1✔
1452
        if (shutdownStatus != null) {
1✔
1453
          return shutdownStatus;
1✔
1454
        }
1455
        uncommittedRetriableStreams.add(retriableStream);
1✔
1456
        return null;
1✔
1457
      }
1458
    }
1459

1460
    void remove(RetriableStream<?> retriableStream) {
1461
      Status shutdownStatusCopy = null;
1✔
1462

1463
      synchronized (lock) {
1✔
1464
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1465
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1466
          shutdownStatusCopy = shutdownStatus;
1✔
1467
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1468
          // hashmap.
1469
          uncommittedRetriableStreams = new HashSet<>();
1✔
1470
        }
1471
      }
1✔
1472

1473
      if (shutdownStatusCopy != null) {
1✔
1474
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1475
      }
1476
    }
1✔
1477
  }
1478

1479
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1480
    AutoConfiguredLoadBalancer lb;
1481

1482
    @Override
1483
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1484
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1485
      // No new subchannel should be created after load balancer has been shutdown.
1486
      checkState(!terminating, "Channel is being terminated");
1✔
1487
      return new SubchannelImpl(args);
1✔
1488
    }
1489

1490
    @Override
1491
    public void updateBalancingState(
1492
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1493
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1494
      checkNotNull(newState, "newState");
1✔
1495
      checkNotNull(newPicker, "newPicker");
1✔
1496
      final class UpdateBalancingState implements Runnable {
1✔
1497
        @Override
1498
        public void run() {
1499
          if (LbHelperImpl.this != lbHelper) {
1✔
1500
            return;
1✔
1501
          }
1502
          updateSubchannelPicker(newPicker);
1✔
1503
          // It's not appropriate to report SHUTDOWN state from lb.
1504
          // Ignore the case of newState == SHUTDOWN for now.
1505
          if (newState != SHUTDOWN) {
1✔
1506
            channelLogger.log(
1✔
1507
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1508
            channelStateManager.gotoState(newState);
1✔
1509
          }
1510
        }
1✔
1511
      }
1512

1513
      syncContext.execute(new UpdateBalancingState());
1✔
1514
    }
1✔
1515

1516
    @Override
1517
    public void refreshNameResolution() {
1518
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1519
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1520
        @Override
1521
        public void run() {
1522
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1523
        }
1✔
1524
      }
1525

1526
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1527
    }
1✔
1528

1529
    @Override
1530
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1531
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1532
    }
1533

1534
    @Override
1535
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1536
        String authority) {
1537
      // TODO(ejona): can we be even stricter? Like terminating?
1538
      checkState(!terminated, "Channel is terminated");
1✔
1539
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1540
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1541
      InternalLogId subchannelLogId =
1✔
1542
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1543
      ChannelTracer oobChannelTracer =
1✔
1544
          new ChannelTracer(
1545
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1546
              "OobChannel for " + addressGroup);
1547
      final OobChannel oobChannel = new OobChannel(
1✔
1548
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1549
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1550
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1551
          .setDescription("Child OobChannel created")
1✔
1552
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1553
          .setTimestampNanos(oobChannelCreationTime)
1✔
1554
          .setChannelRef(oobChannel)
1✔
1555
          .build());
1✔
1556
      ChannelTracer subchannelTracer =
1✔
1557
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1558
              "Subchannel for " + addressGroup);
1559
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1560
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1561
        @Override
1562
        void onTerminated(InternalSubchannel is) {
1563
          oobChannels.remove(oobChannel);
1✔
1564
          channelz.removeSubchannel(is);
1✔
1565
          oobChannel.handleSubchannelTerminated();
1✔
1566
          maybeTerminateChannel();
1✔
1567
        }
1✔
1568

1569
        @Override
1570
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1571
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1572
          //  state and refresh name resolution if necessary.
1573
          handleInternalSubchannelState(newState);
1✔
1574
          oobChannel.handleSubchannelStateChange(newState);
1✔
1575
        }
1✔
1576
      }
1577

1578
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1579
          addressGroup,
1580
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1581
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1582
          // All callback methods are run from syncContext
1583
          new ManagedOobChannelCallback(),
1584
          channelz,
1✔
1585
          callTracerFactory.create(),
1✔
1586
          subchannelTracer,
1587
          subchannelLogId,
1588
          subchannelLogger,
1589
          transportFilters);
1✔
1590
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1591
          .setDescription("Child Subchannel created")
1✔
1592
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1593
          .setTimestampNanos(oobChannelCreationTime)
1✔
1594
          .setSubchannelRef(internalSubchannel)
1✔
1595
          .build());
1✔
1596
      channelz.addSubchannel(oobChannel);
1✔
1597
      channelz.addSubchannel(internalSubchannel);
1✔
1598
      oobChannel.setSubchannel(internalSubchannel);
1✔
1599
      final class AddOobChannel implements Runnable {
1✔
1600
        @Override
1601
        public void run() {
1602
          if (terminating) {
1✔
1603
            oobChannel.shutdown();
×
1604
          }
1605
          if (!terminated) {
1✔
1606
            // If channel has not terminated, it will track the subchannel and block termination
1607
            // for it.
1608
            oobChannels.add(oobChannel);
1✔
1609
          }
1610
        }
1✔
1611
      }
1612

1613
      syncContext.execute(new AddOobChannel());
1✔
1614
      return oobChannel;
1✔
1615
    }
1616

1617
    @Deprecated
1618
    @Override
1619
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1620
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1621
          // Override authority to keep the old behavior.
1622
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1623
          .overrideAuthority(getAuthority());
1✔
1624
    }
1625

1626
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1627
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1628
    @Override
1629
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1630
        final String target, final ChannelCredentials channelCreds) {
1631
      checkNotNull(channelCreds, "channelCreds");
1✔
1632

1633
      final class ResolvingOobChannelBuilder
1634
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1635
        final ManagedChannelBuilder<?> delegate;
1636

1637
        ResolvingOobChannelBuilder() {
1✔
1638
          final ClientTransportFactory transportFactory;
1639
          CallCredentials callCredentials;
1640
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1641
            transportFactory = originalTransportFactory;
1✔
1642
            callCredentials = null;
1✔
1643
          } else {
1644
            SwapChannelCredentialsResult swapResult =
1✔
1645
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1646
            if (swapResult == null) {
1✔
1647
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1648
              return;
×
1649
            } else {
1650
              transportFactory = swapResult.transportFactory;
1✔
1651
              callCredentials = swapResult.callCredentials;
1✔
1652
            }
1653
          }
1654
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1655
              new ClientTransportFactoryBuilder() {
1✔
1656
                @Override
1657
                public ClientTransportFactory buildClientTransportFactory() {
1658
                  return transportFactory;
1✔
1659
                }
1660
              };
1661
          delegate = new ManagedChannelImplBuilder(
1✔
1662
              target,
1663
              channelCreds,
1664
              callCredentials,
1665
              transportFactoryBuilder,
1666
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1667
              .nameResolverRegistry(nameResolverRegistry);
1✔
1668
        }
1✔
1669

1670
        @Override
1671
        protected ManagedChannelBuilder<?> delegate() {
1672
          return delegate;
1✔
1673
        }
1674
      }
1675

1676
      checkState(!terminated, "Channel is terminated");
1✔
1677

1678
      @SuppressWarnings("deprecation")
1679
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1680

1681
      return builder
1✔
1682
          // TODO(zdapeng): executors should not outlive the parent channel.
1683
          .executor(executor)
1✔
1684
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1685
          .maxTraceEvents(maxTraceEvents)
1✔
1686
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1687
          .userAgent(userAgent);
1✔
1688
    }
1689

1690
    @Override
1691
    public ChannelCredentials getUnsafeChannelCredentials() {
1692
      if (originalChannelCreds == null) {
×
1693
        return new DefaultChannelCreds();
×
1694
      }
1695
      return originalChannelCreds;
×
1696
    }
1697

1698
    @Override
1699
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1700
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1701
    }
×
1702

1703
    @Override
1704
    public void updateOobChannelAddresses(ManagedChannel channel,
1705
        List<EquivalentAddressGroup> eag) {
1706
      checkArgument(channel instanceof OobChannel,
1✔
1707
          "channel must have been returned from createOobChannel");
1708
      ((OobChannel) channel).updateAddresses(eag);
1✔
1709
    }
1✔
1710

1711
    @Override
1712
    public String getAuthority() {
1713
      return ManagedChannelImpl.this.authority();
1✔
1714
    }
1715

1716
    @Override
1717
    public String getChannelTarget() {
1718
      return targetUri.toString();
×
1719
    }
1720

1721
    @Override
1722
    public SynchronizationContext getSynchronizationContext() {
1723
      return syncContext;
1✔
1724
    }
1725

1726
    @Override
1727
    public ScheduledExecutorService getScheduledExecutorService() {
1728
      return scheduledExecutor;
1✔
1729
    }
1730

1731
    @Override
1732
    public ChannelLogger getChannelLogger() {
1733
      return channelLogger;
1✔
1734
    }
1735

1736
    @Override
1737
    public NameResolver.Args getNameResolverArgs() {
1738
      return nameResolverArgs;
1✔
1739
    }
1740

1741
    @Override
1742
    public NameResolverRegistry getNameResolverRegistry() {
1743
      return nameResolverRegistry;
1✔
1744
    }
1745

1746
    /**
1747
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1748
     */
1749
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1750
    //     channel creds.
1751
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1752
      @Override
1753
      public ChannelCredentials withoutBearerTokens() {
1754
        return this;
×
1755
      }
1756
    }
1757
  }
1758

1759
  final class NameResolverListener extends NameResolver.Listener2 {
1760
    final LbHelperImpl helper;
1761
    final NameResolver resolver;
1762

1763
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1764
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1765
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1766
    }
1✔
1767

1768
    @Override
1769
    public void onResult(final ResolutionResult resolutionResult) {
1770
      final class NamesResolved implements Runnable {
1✔
1771

1772
        @SuppressWarnings("ReferenceEquality")
1773
        @Override
1774
        public void run() {
1775
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1776
            return;
1✔
1777
          }
1778

1779
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1780
          channelLogger.log(
1✔
1781
              ChannelLogLevel.DEBUG,
1782
              "Resolved address: {0}, config={1}",
1783
              servers,
1784
              resolutionResult.getAttributes());
1✔
1785

1786
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1787
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1788
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1789
          }
1790

1791
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1792
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1793
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1794
          InternalConfigSelector resolvedConfigSelector =
1✔
1795
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1796
          ManagedChannelServiceConfig validServiceConfig =
1797
              configOrError != null && configOrError.getConfig() != null
1✔
1798
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1799
                  : null;
1✔
1800
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1801

1802
          ManagedChannelServiceConfig effectiveServiceConfig;
1803
          if (!lookUpServiceConfig) {
1✔
1804
            if (validServiceConfig != null) {
1✔
1805
              channelLogger.log(
1✔
1806
                  ChannelLogLevel.INFO,
1807
                  "Service config from name resolver discarded by channel settings");
1808
            }
1809
            effectiveServiceConfig =
1810
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1811
            if (resolvedConfigSelector != null) {
1✔
1812
              channelLogger.log(
1✔
1813
                  ChannelLogLevel.INFO,
1814
                  "Config selector from name resolver discarded by channel settings");
1815
            }
1816
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1817
          } else {
1818
            // Try to use config if returned from name resolver
1819
            // Otherwise, try to use the default config if available
1820
            if (validServiceConfig != null) {
1✔
1821
              effectiveServiceConfig = validServiceConfig;
1✔
1822
              if (resolvedConfigSelector != null) {
1✔
1823
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1824
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1825
                  channelLogger.log(
×
1826
                      ChannelLogLevel.DEBUG,
1827
                      "Method configs in service config will be discarded due to presence of"
1828
                          + "config-selector");
1829
                }
1830
              } else {
1831
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1832
              }
1833
            } else if (defaultServiceConfig != null) {
1✔
1834
              effectiveServiceConfig = defaultServiceConfig;
1✔
1835
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1836
              channelLogger.log(
1✔
1837
                  ChannelLogLevel.INFO,
1838
                  "Received no service config, using default service config");
1839
            } else if (serviceConfigError != null) {
1✔
1840
              if (!serviceConfigUpdated) {
1✔
1841
                // First DNS lookup has invalid service config, and cannot fall back to default
1842
                channelLogger.log(
1✔
1843
                    ChannelLogLevel.INFO,
1844
                    "Fallback to error due to invalid first service config without default config");
1845
                // This error could be an "inappropriate" control plane error that should not bleed
1846
                // through to client code using gRPC. We let them flow through here to the LB as
1847
                // we later check for these error codes when investigating pick results in
1848
                // GrpcUtil.getTransportFromPickResult().
1849
                onError(configOrError.getError());
1✔
1850
                if (resolutionResultListener != null) {
1✔
1851
                  resolutionResultListener.resolutionAttempted(configOrError.getError());
1✔
1852
                }
1853
                return;
1✔
1854
              } else {
1855
                effectiveServiceConfig = lastServiceConfig;
1✔
1856
              }
1857
            } else {
1858
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1859
              realChannel.updateConfigSelector(null);
1✔
1860
            }
1861
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1862
              channelLogger.log(
1✔
1863
                  ChannelLogLevel.INFO,
1864
                  "Service config changed{0}",
1865
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1866
              lastServiceConfig = effectiveServiceConfig;
1✔
1867
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1868
            }
1869

1870
            try {
1871
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1872
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1873
              //  lbNeedAddress is not deterministic
1874
              serviceConfigUpdated = true;
1✔
1875
            } catch (RuntimeException re) {
×
1876
              logger.log(
×
1877
                  Level.WARNING,
1878
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1879
                  re);
1880
            }
1✔
1881
          }
1882

1883
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1884
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1885
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1886
            Attributes.Builder attrBuilder =
1✔
1887
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1888
            Map<String, ?> healthCheckingConfig =
1✔
1889
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1890
            if (healthCheckingConfig != null) {
1✔
1891
              attrBuilder
1✔
1892
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1893
                  .build();
1✔
1894
            }
1895
            Attributes attributes = attrBuilder.build();
1✔
1896

1897
            Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1898
                ResolvedAddresses.newBuilder()
1✔
1899
                    .setAddresses(servers)
1✔
1900
                    .setAttributes(attributes)
1✔
1901
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1902
                    .build());
1✔
1903
            // If a listener is provided, let it know if the addresses were accepted.
1904
            if (resolutionResultListener != null) {
1✔
1905
              resolutionResultListener.resolutionAttempted(addressAcceptanceStatus);
1✔
1906
            }
1907
          }
1908
        }
1✔
1909
      }
1910

1911
      syncContext.execute(new NamesResolved());
1✔
1912
    }
1✔
1913

1914
    @Override
1915
    public void onError(final Status error) {
1916
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1917
      final class NameResolverErrorHandler implements Runnable {
1✔
1918
        @Override
1919
        public void run() {
1920
          handleErrorInSyncContext(error);
1✔
1921
        }
1✔
1922
      }
1923

1924
      syncContext.execute(new NameResolverErrorHandler());
1✔
1925
    }
1✔
1926

1927
    private void handleErrorInSyncContext(Status error) {
1928
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1929
          new Object[] {getLogId(), error});
1✔
1930
      realChannel.onConfigError();
1✔
1931
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1932
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1933
        lastResolutionState = ResolutionState.ERROR;
1✔
1934
      }
1935
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1936
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1937
        return;
1✔
1938
      }
1939

1940
      helper.lb.handleNameResolutionError(error);
1✔
1941
    }
1✔
1942
  }
1943

1944
  private final class SubchannelImpl extends AbstractSubchannel {
1945
    final CreateSubchannelArgs args;
1946
    final InternalLogId subchannelLogId;
1947
    final ChannelLoggerImpl subchannelLogger;
1948
    final ChannelTracer subchannelTracer;
1949
    List<EquivalentAddressGroup> addressGroups;
1950
    InternalSubchannel subchannel;
1951
    boolean started;
1952
    boolean shutdown;
1953
    ScheduledHandle delayedShutdownTask;
1954

1955
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1956
      checkNotNull(args, "args");
1✔
1957
      addressGroups = args.getAddresses();
1✔
1958
      if (authorityOverride != null) {
1✔
1959
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1960
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1961
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1962
      }
1963
      this.args = args;
1✔
1964
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1965
      subchannelTracer = new ChannelTracer(
1✔
1966
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1967
          "Subchannel for " + args.getAddresses());
1✔
1968
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1969
    }
1✔
1970

1971
    @Override
1972
    public void start(final SubchannelStateListener listener) {
1973
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1974
      checkState(!started, "already started");
1✔
1975
      checkState(!shutdown, "already shutdown");
1✔
1976
      checkState(!terminating, "Channel is being terminated");
1✔
1977
      started = true;
1✔
1978
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1979
        // All callbacks are run in syncContext
1980
        @Override
1981
        void onTerminated(InternalSubchannel is) {
1982
          subchannels.remove(is);
1✔
1983
          channelz.removeSubchannel(is);
1✔
1984
          maybeTerminateChannel();
1✔
1985
        }
1✔
1986

1987
        @Override
1988
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1989
          checkState(listener != null, "listener is null");
1✔
1990
          listener.onSubchannelState(newState);
1✔
1991
        }
1✔
1992

1993
        @Override
1994
        void onInUse(InternalSubchannel is) {
1995
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1996
        }
1✔
1997

1998
        @Override
1999
        void onNotInUse(InternalSubchannel is) {
2000
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
2001
        }
1✔
2002
      }
2003

2004
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
2005
          args.getAddresses(),
1✔
2006
          authority(),
1✔
2007
          userAgent,
1✔
2008
          backoffPolicyProvider,
1✔
2009
          transportFactory,
1✔
2010
          transportFactory.getScheduledExecutorService(),
1✔
2011
          stopwatchSupplier,
1✔
2012
          syncContext,
2013
          new ManagedInternalSubchannelCallback(),
2014
          channelz,
1✔
2015
          callTracerFactory.create(),
1✔
2016
          subchannelTracer,
2017
          subchannelLogId,
2018
          subchannelLogger,
2019
          transportFilters);
1✔
2020

2021
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
2022
          .setDescription("Child Subchannel started")
1✔
2023
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
2024
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
2025
          .setSubchannelRef(internalSubchannel)
1✔
2026
          .build());
1✔
2027

2028
      this.subchannel = internalSubchannel;
1✔
2029
      channelz.addSubchannel(internalSubchannel);
1✔
2030
      subchannels.add(internalSubchannel);
1✔
2031
    }
1✔
2032

2033
    @Override
2034
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
2035
      checkState(started, "not started");
1✔
2036
      return subchannel;
1✔
2037
    }
2038

2039
    @Override
2040
    public void shutdown() {
2041
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2042
      if (subchannel == null) {
1✔
2043
        // start() was not successful
2044
        shutdown = true;
×
2045
        return;
×
2046
      }
2047
      if (shutdown) {
1✔
2048
        if (terminating && delayedShutdownTask != null) {
1✔
2049
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
2050
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
2051
          delayedShutdownTask.cancel();
×
2052
          delayedShutdownTask = null;
×
2053
          // Will fall through to the subchannel.shutdown() at the end.
2054
        } else {
2055
          return;
1✔
2056
        }
2057
      } else {
2058
        shutdown = true;
1✔
2059
      }
2060
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
2061
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
2062
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
2063
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
2064
      // shutdown of Subchannel for a few seconds here.
2065
      //
2066
      // TODO(zhangkun83): consider a better approach
2067
      // (https://github.com/grpc/grpc-java/issues/2562).
2068
      if (!terminating) {
1✔
2069
        final class ShutdownSubchannel implements Runnable {
1✔
2070
          @Override
2071
          public void run() {
2072
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
2073
          }
1✔
2074
        }
2075

2076
        delayedShutdownTask = syncContext.schedule(
1✔
2077
            new LogExceptionRunnable(new ShutdownSubchannel()),
2078
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2079
            transportFactory.getScheduledExecutorService());
1✔
2080
        return;
1✔
2081
      }
2082
      // When terminating == true, no more real streams will be created. It's safe and also
2083
      // desirable to shutdown timely.
2084
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
2085
    }
1✔
2086

2087
    @Override
2088
    public void requestConnection() {
2089
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2090
      checkState(started, "not started");
1✔
2091
      subchannel.obtainActiveTransport();
1✔
2092
    }
1✔
2093

2094
    @Override
2095
    public List<EquivalentAddressGroup> getAllAddresses() {
2096
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2097
      checkState(started, "not started");
1✔
2098
      return addressGroups;
1✔
2099
    }
2100

2101
    @Override
2102
    public Attributes getAttributes() {
2103
      return args.getAttributes();
1✔
2104
    }
2105

2106
    @Override
2107
    public String toString() {
2108
      return subchannelLogId.toString();
1✔
2109
    }
2110

2111
    @Override
2112
    public Channel asChannel() {
2113
      checkState(started, "not started");
1✔
2114
      return new SubchannelChannel(
1✔
2115
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2116
          transportFactory.getScheduledExecutorService(),
1✔
2117
          callTracerFactory.create(),
1✔
2118
          new AtomicReference<InternalConfigSelector>(null));
2119
    }
2120

2121
    @Override
2122
    public Object getInternalSubchannel() {
2123
      checkState(started, "Subchannel is not started");
1✔
2124
      return subchannel;
1✔
2125
    }
2126

2127
    @Override
2128
    public ChannelLogger getChannelLogger() {
2129
      return subchannelLogger;
1✔
2130
    }
2131

2132
    @Override
2133
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2134
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2135
      addressGroups = addrs;
1✔
2136
      if (authorityOverride != null) {
1✔
2137
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2138
      }
2139
      subchannel.updateAddresses(addrs);
1✔
2140
    }
1✔
2141

2142
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2143
        List<EquivalentAddressGroup> eags) {
2144
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2145
      for (EquivalentAddressGroup eag : eags) {
1✔
2146
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2147
            eag.getAddresses(),
1✔
2148
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2149
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2150
      }
1✔
2151
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2152
    }
2153
  }
2154

2155
  @Override
2156
  public String toString() {
2157
    return MoreObjects.toStringHelper(this)
1✔
2158
        .add("logId", logId.getId())
1✔
2159
        .add("target", target)
1✔
2160
        .toString();
1✔
2161
  }
2162

2163
  /**
2164
   * Called from syncContext.
2165
   */
2166
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2167
    @Override
2168
    public void transportShutdown(Status s) {
2169
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2170
    }
1✔
2171

2172
    @Override
2173
    public void transportReady() {
2174
      // Don't care
2175
    }
×
2176

2177
    @Override
2178
    public Attributes filterTransport(Attributes attributes) {
2179
      return attributes;
×
2180
    }
2181

2182
    @Override
2183
    public void transportInUse(final boolean inUse) {
2184
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2185
    }
1✔
2186

2187
    @Override
2188
    public void transportTerminated() {
2189
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2190
      terminating = true;
1✔
2191
      shutdownNameResolverAndLoadBalancer(false);
1✔
2192
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2193
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2194
      // here.
2195
      maybeShutdownNowSubchannels();
1✔
2196
      maybeTerminateChannel();
1✔
2197
    }
1✔
2198
  }
2199

2200
  /**
2201
   * Must be accessed from syncContext.
2202
   */
2203
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2204
    @Override
2205
    protected void handleInUse() {
2206
      exitIdleMode();
1✔
2207
    }
1✔
2208

2209
    @Override
2210
    protected void handleNotInUse() {
2211
      if (shutdown.get()) {
1✔
2212
        return;
1✔
2213
      }
2214
      rescheduleIdleTimer();
1✔
2215
    }
1✔
2216
  }
2217

2218
  /**
2219
   * Lazily request for Executor from an executor pool.
2220
   * Also act as an Executor directly to simply run a cmd
2221
   */
2222
  @VisibleForTesting
2223
  static final class ExecutorHolder implements Executor {
2224
    private final ObjectPool<? extends Executor> pool;
2225
    private Executor executor;
2226

2227
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2228
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2229
    }
1✔
2230

2231
    synchronized Executor getExecutor() {
2232
      if (executor == null) {
1✔
2233
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2234
      }
2235
      return executor;
1✔
2236
    }
2237

2238
    synchronized void release() {
2239
      if (executor != null) {
1✔
2240
        executor = pool.returnObject(executor);
1✔
2241
      }
2242
    }
1✔
2243

2244
    @Override
2245
    public void execute(Runnable command) {
2246
      getExecutor().execute(command);
1✔
2247
    }
1✔
2248
  }
2249

2250
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2251
    final ScheduledExecutorService delegate;
2252

2253
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2254
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2255
    }
1✔
2256

2257
    @Override
2258
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2259
      return delegate.schedule(callable, delay, unit);
×
2260
    }
2261

2262
    @Override
2263
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2264
      return delegate.schedule(cmd, delay, unit);
1✔
2265
    }
2266

2267
    @Override
2268
    public ScheduledFuture<?> scheduleAtFixedRate(
2269
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2270
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
×
2271
    }
2272

2273
    @Override
2274
    public ScheduledFuture<?> scheduleWithFixedDelay(
2275
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2276
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2277
    }
2278

2279
    @Override
2280
    public boolean awaitTermination(long timeout, TimeUnit unit)
2281
        throws InterruptedException {
2282
      return delegate.awaitTermination(timeout, unit);
×
2283
    }
2284

2285
    @Override
2286
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2287
        throws InterruptedException {
2288
      return delegate.invokeAll(tasks);
×
2289
    }
2290

2291
    @Override
2292
    public <T> List<Future<T>> invokeAll(
2293
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2294
        throws InterruptedException {
2295
      return delegate.invokeAll(tasks, timeout, unit);
×
2296
    }
2297

2298
    @Override
2299
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2300
        throws InterruptedException, ExecutionException {
2301
      return delegate.invokeAny(tasks);
×
2302
    }
2303

2304
    @Override
2305
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2306
        throws InterruptedException, ExecutionException, TimeoutException {
2307
      return delegate.invokeAny(tasks, timeout, unit);
×
2308
    }
2309

2310
    @Override
2311
    public boolean isShutdown() {
2312
      return delegate.isShutdown();
×
2313
    }
2314

2315
    @Override
2316
    public boolean isTerminated() {
2317
      return delegate.isTerminated();
×
2318
    }
2319

2320
    @Override
2321
    public void shutdown() {
2322
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2323
    }
2324

2325
    @Override
2326
    public List<Runnable> shutdownNow() {
2327
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2328
    }
2329

2330
    @Override
2331
    public <T> Future<T> submit(Callable<T> task) {
2332
      return delegate.submit(task);
×
2333
    }
2334

2335
    @Override
2336
    public Future<?> submit(Runnable task) {
2337
      return delegate.submit(task);
×
2338
    }
2339

2340
    @Override
2341
    public <T> Future<T> submit(Runnable task, T result) {
2342
      return delegate.submit(task, result);
×
2343
    }
2344

2345
    @Override
2346
    public void execute(Runnable command) {
2347
      delegate.execute(command);
×
2348
    }
×
2349
  }
2350

2351
  /**
2352
   * A ResolutionState indicates the status of last name resolution.
2353
   */
2354
  enum ResolutionState {
1✔
2355
    NO_RESOLUTION,
1✔
2356
    SUCCESS,
1✔
2357
    ERROR
1✔
2358
  }
2359
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc