• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19776

09 Apr 2025 07:51PM UTC coverage: 88.586% (-0.01%) from 88.598%
#19776

push

github

ejona86
api: Remove deprecated SubchannelPicker.requestConnection()

It has been deprecated since cec9ee368, six years ago. It was replaced
with LoadBalancer.requestConnection().

34731 of 39206 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.68
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import com.google.errorprone.annotations.concurrent.GuardedBy;
36
import io.grpc.Attributes;
37
import io.grpc.CallCredentials;
38
import io.grpc.CallOptions;
39
import io.grpc.Channel;
40
import io.grpc.ChannelCredentials;
41
import io.grpc.ChannelLogger;
42
import io.grpc.ChannelLogger.ChannelLogLevel;
43
import io.grpc.ClientCall;
44
import io.grpc.ClientInterceptor;
45
import io.grpc.ClientInterceptors;
46
import io.grpc.ClientStreamTracer;
47
import io.grpc.ClientTransportFilter;
48
import io.grpc.CompressorRegistry;
49
import io.grpc.ConnectivityState;
50
import io.grpc.ConnectivityStateInfo;
51
import io.grpc.Context;
52
import io.grpc.Deadline;
53
import io.grpc.DecompressorRegistry;
54
import io.grpc.EquivalentAddressGroup;
55
import io.grpc.ForwardingChannelBuilder2;
56
import io.grpc.ForwardingClientCall;
57
import io.grpc.Grpc;
58
import io.grpc.InternalChannelz;
59
import io.grpc.InternalChannelz.ChannelStats;
60
import io.grpc.InternalChannelz.ChannelTrace;
61
import io.grpc.InternalConfigSelector;
62
import io.grpc.InternalInstrumented;
63
import io.grpc.InternalLogId;
64
import io.grpc.InternalWithLogId;
65
import io.grpc.LoadBalancer;
66
import io.grpc.LoadBalancer.CreateSubchannelArgs;
67
import io.grpc.LoadBalancer.PickResult;
68
import io.grpc.LoadBalancer.PickSubchannelArgs;
69
import io.grpc.LoadBalancer.ResolvedAddresses;
70
import io.grpc.LoadBalancer.SubchannelPicker;
71
import io.grpc.LoadBalancer.SubchannelStateListener;
72
import io.grpc.ManagedChannel;
73
import io.grpc.ManagedChannelBuilder;
74
import io.grpc.Metadata;
75
import io.grpc.MethodDescriptor;
76
import io.grpc.MetricInstrumentRegistry;
77
import io.grpc.MetricRecorder;
78
import io.grpc.NameResolver;
79
import io.grpc.NameResolver.ConfigOrError;
80
import io.grpc.NameResolver.ResolutionResult;
81
import io.grpc.NameResolverProvider;
82
import io.grpc.NameResolverRegistry;
83
import io.grpc.ProxyDetector;
84
import io.grpc.Status;
85
import io.grpc.StatusOr;
86
import io.grpc.SynchronizationContext;
87
import io.grpc.SynchronizationContext.ScheduledHandle;
88
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
89
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
90
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
91
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
92
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
93
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
94
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
95
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
96
import io.grpc.internal.RetriableStream.Throttle;
97
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
98
import java.net.URI;
99
import java.util.ArrayList;
100
import java.util.Collection;
101
import java.util.Collections;
102
import java.util.HashSet;
103
import java.util.LinkedHashSet;
104
import java.util.List;
105
import java.util.Map;
106
import java.util.Set;
107
import java.util.concurrent.Callable;
108
import java.util.concurrent.CountDownLatch;
109
import java.util.concurrent.ExecutionException;
110
import java.util.concurrent.Executor;
111
import java.util.concurrent.Future;
112
import java.util.concurrent.ScheduledExecutorService;
113
import java.util.concurrent.ScheduledFuture;
114
import java.util.concurrent.TimeUnit;
115
import java.util.concurrent.TimeoutException;
116
import java.util.concurrent.atomic.AtomicBoolean;
117
import java.util.concurrent.atomic.AtomicReference;
118
import java.util.logging.Level;
119
import java.util.logging.Logger;
120
import javax.annotation.Nullable;
121
import javax.annotation.concurrent.ThreadSafe;
122

123
/** A communication channel for making outgoing RPCs. */
124
@ThreadSafe
125
final class ManagedChannelImpl extends ManagedChannel implements
126
    InternalInstrumented<ChannelStats> {
127
  @VisibleForTesting
128
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
129

130
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
131

132
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
133

134
  @VisibleForTesting
135
  static final Status SHUTDOWN_NOW_STATUS =
1✔
136
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
137

138
  @VisibleForTesting
139
  static final Status SHUTDOWN_STATUS =
1✔
140
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
141

142
  @VisibleForTesting
143
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
144
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
145

146
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
147
      ManagedChannelServiceConfig.empty();
1✔
148
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
149
      new InternalConfigSelector() {
1✔
150
        @Override
151
        public Result selectConfig(PickSubchannelArgs args) {
152
          throw new IllegalStateException("Resolution is pending");
×
153
        }
154
      };
155
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
156
      new LoadBalancer.PickDetailsConsumer() {};
1✔
157

158
  private final InternalLogId logId;
159
  private final String target;
160
  @Nullable
161
  private final String authorityOverride;
162
  private final NameResolverRegistry nameResolverRegistry;
163
  private final URI targetUri;
164
  private final NameResolverProvider nameResolverProvider;
165
  private final NameResolver.Args nameResolverArgs;
166
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
167
  private final ClientTransportFactory originalTransportFactory;
168
  @Nullable
169
  private final ChannelCredentials originalChannelCreds;
170
  private final ClientTransportFactory transportFactory;
171
  private final ClientTransportFactory oobTransportFactory;
172
  private final RestrictedScheduledExecutor scheduledExecutor;
173
  private final Executor executor;
174
  private final ObjectPool<? extends Executor> executorPool;
175
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
176
  private final ExecutorHolder balancerRpcExecutorHolder;
177
  private final ExecutorHolder offloadExecutorHolder;
178
  private final TimeProvider timeProvider;
179
  private final int maxTraceEvents;
180

181
  @VisibleForTesting
1✔
182
  final SynchronizationContext syncContext = new SynchronizationContext(
183
      new Thread.UncaughtExceptionHandler() {
1✔
184
        @Override
185
        public void uncaughtException(Thread t, Throwable e) {
186
          logger.log(
1✔
187
              Level.SEVERE,
188
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
189
              e);
190
          try {
191
            panic(e);
1✔
192
          } catch (Throwable anotherT) {
×
193
            logger.log(
×
194
                Level.SEVERE, "[" + getLogId() + "] Uncaught exception while panicking", anotherT);
×
195
          }
1✔
196
        }
1✔
197
      });
198

199
  private boolean fullStreamDecompression;
200

201
  private final DecompressorRegistry decompressorRegistry;
202
  private final CompressorRegistry compressorRegistry;
203

204
  private final Supplier<Stopwatch> stopwatchSupplier;
205
  /** The timeout before entering idle mode. */
206
  private final long idleTimeoutMillis;
207

208
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
209
  private final BackoffPolicy.Provider backoffPolicyProvider;
210

211
  /**
212
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
213
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
214
   * {@link RealChannel}.
215
   */
216
  private final Channel interceptorChannel;
217

218
  private final List<ClientTransportFilter> transportFilters;
219
  @Nullable private final String userAgent;
220

221
  // Only null after channel is terminated. Must be assigned from the syncContext.
222
  private NameResolver nameResolver;
223

224
  // Must be accessed from the syncContext.
225
  private boolean nameResolverStarted;
226

227
  // null when channel is in idle mode.  Must be assigned from syncContext.
228
  @Nullable
229
  private LbHelperImpl lbHelper;
230

231
  // Must be accessed from the syncContext
232
  private boolean panicMode;
233

234
  // Must be mutated from syncContext
235
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
236
  // switch to a ConcurrentHashMap.
237
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
238

239
  // Must be accessed from syncContext
240
  @Nullable
241
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
242
  private final Object pendingCallsInUseObject = new Object();
1✔
243

244
  // Must be mutated from syncContext
245
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
246

247
  // reprocess() must be run from syncContext
248
  private final DelayedClientTransport delayedTransport;
249
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
250
      = new UncommittedRetriableStreamsRegistry();
251

252
  // Shutdown states.
253
  //
254
  // Channel's shutdown process:
255
  // 1. shutdown(): stop accepting new calls from applications
256
  //   1a shutdown <- true
257
  //   1b delayedTransport.shutdown()
258
  // 2. delayedTransport terminated: stop stream-creation functionality
259
  //   2a terminating <- true
260
  //   2b loadBalancer.shutdown()
261
  //     * LoadBalancer will shutdown subchannels and OOB channels
262
  //   2c loadBalancer <- null
263
  //   2d nameResolver.shutdown()
264
  //   2e nameResolver <- null
265
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
266

267
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
268
  // Must only be mutated and read from syncContext
269
  private boolean shutdownNowed;
270
  // Must only be mutated from syncContext
271
  private boolean terminating;
272
  // Must be mutated from syncContext
273
  private volatile boolean terminated;
274
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
275

276
  private final CallTracer.Factory callTracerFactory;
277
  private final CallTracer channelCallTracer;
278
  private final ChannelTracer channelTracer;
279
  private final ChannelLogger channelLogger;
280
  private final InternalChannelz channelz;
281
  private final RealChannel realChannel;
282
  // Must be mutated and read from syncContext
283
  // a flag for doing channel tracing when flipped
284
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
285
  // Must be mutated and read from constructor or syncContext
286
  // used for channel tracing when value changed
287
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
288

289
  @Nullable
290
  private final ManagedChannelServiceConfig defaultServiceConfig;
291
  // Must be mutated and read from constructor or syncContext
292
  private boolean serviceConfigUpdated = false;
1✔
293
  private final boolean lookUpServiceConfig;
294

295
  // One instance per channel.
296
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
297

298
  private final long perRpcBufferLimit;
299
  private final long channelBufferLimit;
300

301
  // Temporary false flag that can skip the retry code path.
302
  private final boolean retryEnabled;
303

304
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
305

306
  // Called from syncContext
307
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
308
      new DelayedTransportListener();
309

310
  // Must be called from syncContext
311
  private void maybeShutdownNowSubchannels() {
312
    if (shutdownNowed) {
1✔
313
      for (InternalSubchannel subchannel : subchannels) {
1✔
314
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
315
      }
1✔
316
      for (OobChannel oobChannel : oobChannels) {
1✔
317
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
318
      }
1✔
319
    }
320
  }
1✔
321

322
  // Must be accessed from syncContext
323
  @VisibleForTesting
1✔
324
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
325

326
  @Override
327
  public ListenableFuture<ChannelStats> getStats() {
328
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
329
    final class StatsFetcher implements Runnable {
1✔
330
      @Override
331
      public void run() {
332
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
333
        channelCallTracer.updateBuilder(builder);
1✔
334
        channelTracer.updateBuilder(builder);
1✔
335
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
336
        List<InternalWithLogId> children = new ArrayList<>();
1✔
337
        children.addAll(subchannels);
1✔
338
        children.addAll(oobChannels);
1✔
339
        builder.setSubchannels(children);
1✔
340
        ret.set(builder.build());
1✔
341
      }
1✔
342
    }
343

344
    // subchannels and oobchannels can only be accessed from syncContext
345
    syncContext.execute(new StatsFetcher());
1✔
346
    return ret;
1✔
347
  }
348

349
  @Override
350
  public InternalLogId getLogId() {
351
    return logId;
1✔
352
  }
353

354
  // Run from syncContext
355
  private class IdleModeTimer implements Runnable {
1✔
356

357
    @Override
358
    public void run() {
359
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
360
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
361
      // subtle to change rapidly to resolve the channel panic. See #8714
362
      if (lbHelper == null) {
1✔
363
        return;
×
364
      }
365
      enterIdleMode();
1✔
366
    }
1✔
367
  }
368

369
  // Must be called from syncContext
370
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
371
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
372
    if (channelIsActive) {
1✔
373
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
374
      checkState(lbHelper != null, "lbHelper is null");
1✔
375
    }
376
    if (nameResolver != null) {
1✔
377
      nameResolver.shutdown();
1✔
378
      nameResolverStarted = false;
1✔
379
      if (channelIsActive) {
1✔
380
        nameResolver = getNameResolver(
1✔
381
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
382
      } else {
383
        nameResolver = null;
1✔
384
      }
385
    }
386
    if (lbHelper != null) {
1✔
387
      lbHelper.lb.shutdown();
1✔
388
      lbHelper = null;
1✔
389
    }
390
  }
1✔
391

392
  /**
393
   * Make the channel exit idle mode, if it's in it.
394
   *
395
   * <p>Must be called from syncContext
396
   */
397
  @VisibleForTesting
398
  void exitIdleMode() {
399
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
400
    if (shutdown.get() || panicMode) {
1✔
401
      return;
1✔
402
    }
403
    if (inUseStateAggregator.isInUse()) {
1✔
404
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
405
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
406
      cancelIdleTimer(false);
1✔
407
    } else {
408
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
409
      // isInUse() == false, in which case we still need to schedule the timer.
410
      rescheduleIdleTimer();
1✔
411
    }
412
    if (lbHelper != null) {
1✔
413
      return;
1✔
414
    }
415
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
416
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
417
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
418
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
419
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
420
    this.lbHelper = lbHelper;
1✔
421

422
    channelStateManager.gotoState(CONNECTING);
1✔
423
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
424
    nameResolver.start(listener);
1✔
425
    nameResolverStarted = true;
1✔
426
  }
1✔
427

428
  // Must be run from syncContext
429
  private void enterIdleMode() {
430
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
431
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
432
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
433
    // which are bugs.
434
    shutdownNameResolverAndLoadBalancer(true);
1✔
435
    delayedTransport.reprocess(null);
1✔
436
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
437
    channelStateManager.gotoState(IDLE);
1✔
438
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
439
    // transport to be holding some we need to exit idle mode to give these calls a chance to
440
    // be processed.
441
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
442
      exitIdleMode();
1✔
443
    }
444
  }
1✔
445

446
  // Must be run from syncContext
447
  private void cancelIdleTimer(boolean permanent) {
448
    idleTimer.cancel(permanent);
1✔
449
  }
1✔
450

451
  // Always run from syncContext
452
  private void rescheduleIdleTimer() {
453
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
454
      return;
1✔
455
    }
456
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
457
  }
1✔
458

459
  /**
460
   * Force name resolution refresh to happen immediately. Must be run
461
   * from syncContext.
462
   */
463
  private void refreshNameResolution() {
464
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
465
    if (nameResolverStarted) {
1✔
466
      nameResolver.refresh();
1✔
467
    }
468
  }
1✔
469

470
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
471
    volatile Throttle throttle;
472

473
    @Override
474
    public ClientStream newStream(
475
        final MethodDescriptor<?, ?> method,
476
        final CallOptions callOptions,
477
        final Metadata headers,
478
        final Context context) {
479
      // There is no need to reschedule the idle timer here. If the channel isn't shut down, either
480
      // the delayed transport or a real transport will go in-use and cancel the idle timer.
481
      if (!retryEnabled) {
1✔
482
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
483
            callOptions, headers, 0, /* isTransparentRetry= */ false);
484
        Context origContext = context.attach();
1✔
485
        try {
486
          return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
487
        } finally {
488
          context.detach(origContext);
1✔
489
        }
490
      } else {
491
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
492
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
493
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
494
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
495
          @SuppressWarnings("unchecked")
496
          RetryStream() {
1✔
497
            super(
1✔
498
                (MethodDescriptor<ReqT, ?>) method,
499
                headers,
500
                channelBufferUsed,
1✔
501
                perRpcBufferLimit,
1✔
502
                channelBufferLimit,
1✔
503
                getCallExecutor(callOptions),
1✔
504
                transportFactory.getScheduledExecutorService(),
1✔
505
                retryPolicy,
506
                hedgingPolicy,
507
                throttle);
508
          }
1✔
509

510
          @Override
511
          Status prestart() {
512
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
513
          }
514

515
          @Override
516
          void postCommit() {
517
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
518
          }
1✔
519

520
          @Override
521
          ClientStream newSubstream(
522
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
523
              boolean isTransparentRetry) {
524
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
525
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
526
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
527
            Context origContext = context.attach();
1✔
528
            try {
529
              return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
1✔
530
            } finally {
531
              context.detach(origContext);
1✔
532
            }
533
          }
534
        }
535

536
        return new RetryStream<>();
1✔
537
      }
538
    }
539
  }
540

541
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
542

543
  private final Rescheduler idleTimer;
544
  private final MetricRecorder metricRecorder;
545

546
  ManagedChannelImpl(
547
      ManagedChannelImplBuilder builder,
548
      ClientTransportFactory clientTransportFactory,
549
      URI targetUri,
550
      NameResolverProvider nameResolverProvider,
551
      BackoffPolicy.Provider backoffPolicyProvider,
552
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
553
      Supplier<Stopwatch> stopwatchSupplier,
554
      List<ClientInterceptor> interceptors,
555
      final TimeProvider timeProvider) {
1✔
556
    this.target = checkNotNull(builder.target, "target");
1✔
557
    this.logId = InternalLogId.allocate("Channel", target);
1✔
558
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
559
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
560
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
561
    this.originalChannelCreds = builder.channelCredentials;
1✔
562
    this.originalTransportFactory = clientTransportFactory;
1✔
563
    this.offloadExecutorHolder =
1✔
564
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
565
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
566
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
567
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
568
        clientTransportFactory, null, this.offloadExecutorHolder);
569
    this.scheduledExecutor =
1✔
570
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
571
    maxTraceEvents = builder.maxTraceEvents;
1✔
572
    channelTracer = new ChannelTracer(
1✔
573
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
574
        "Channel for '" + target + "'");
575
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
576
    ProxyDetector proxyDetector =
577
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
578
    this.retryEnabled = builder.retryEnabled;
1✔
579
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
580
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
581
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
582
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
583
    ScParser serviceConfigParser =
1✔
584
        new ScParser(
585
            retryEnabled,
586
            builder.maxRetryAttempts,
587
            builder.maxHedgedAttempts,
588
            loadBalancerFactory);
589
    this.authorityOverride = builder.authorityOverride;
1✔
590
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
591
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
592
    NameResolver.Args.Builder nameResolverArgsBuilder = NameResolver.Args.newBuilder()
1✔
593
            .setDefaultPort(builder.getDefaultPort())
1✔
594
            .setProxyDetector(proxyDetector)
1✔
595
            .setSynchronizationContext(syncContext)
1✔
596
            .setScheduledExecutorService(scheduledExecutor)
1✔
597
            .setServiceConfigParser(serviceConfigParser)
1✔
598
            .setChannelLogger(channelLogger)
1✔
599
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
600
            .setOverrideAuthority(this.authorityOverride)
1✔
601
            .setMetricRecorder(this.metricRecorder);
1✔
602
    builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
1✔
603
    this.nameResolverArgs = nameResolverArgsBuilder.build();
1✔
604
    this.nameResolver = getNameResolver(
1✔
605
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
606
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
607
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
608
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
609
    this.delayedTransport.start(delayedTransportListener);
1✔
610
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
611

612
    if (builder.defaultServiceConfig != null) {
1✔
613
      ConfigOrError parsedDefaultServiceConfig =
1✔
614
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
615
      checkState(
1✔
616
          parsedDefaultServiceConfig.getError() == null,
1✔
617
          "Default config is invalid: %s",
618
          parsedDefaultServiceConfig.getError());
1✔
619
      this.defaultServiceConfig =
1✔
620
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
621
      this.transportProvider.throttle = this.defaultServiceConfig.getRetryThrottling();
1✔
622
    } else {
1✔
623
      this.defaultServiceConfig = null;
1✔
624
    }
625
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
626
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
627
    Channel channel = realChannel;
1✔
628
    if (builder.binlog != null) {
1✔
629
      channel = builder.binlog.wrapChannel(channel);
1✔
630
    }
631
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
632
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
633
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
634
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
635
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
636
    } else {
637
      checkArgument(
1✔
638
          builder.idleTimeoutMillis
639
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
640
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
641
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
642
    }
643

644
    idleTimer = new Rescheduler(
1✔
645
        new IdleModeTimer(),
646
        syncContext,
647
        transportFactory.getScheduledExecutorService(),
1✔
648
        stopwatchSupplier.get());
1✔
649
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
650
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
651
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
652
    this.userAgent = builder.userAgent;
1✔
653

654
    this.channelBufferLimit = builder.retryBufferSize;
1✔
655
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
656
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
657
      @Override
658
      public CallTracer create() {
659
        return new CallTracer(timeProvider);
1✔
660
      }
661
    }
662

663
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
664
    channelCallTracer = callTracerFactory.create();
1✔
665
    this.channelz = checkNotNull(builder.channelz);
1✔
666
    channelz.addRootChannel(this);
1✔
667

668
    if (!lookUpServiceConfig) {
1✔
669
      if (defaultServiceConfig != null) {
1✔
670
        channelLogger.log(
1✔
671
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
672
      }
673
      serviceConfigUpdated = true;
1✔
674
    }
675
  }
1✔
676

677
  @VisibleForTesting
678
  static NameResolver getNameResolver(
679
      URI targetUri, @Nullable final String overrideAuthority,
680
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
681
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
682
    if (resolver == null) {
1✔
683
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
684
    }
685

686
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
687
    // TODO: After a transition period, all NameResolver implementations that need retry should use
688
    //       RetryingNameResolver directly and this step can be removed.
689
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
690
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
691
              nameResolverArgs.getScheduledExecutorService(),
1✔
692
              nameResolverArgs.getSynchronizationContext()),
1✔
693
          nameResolverArgs.getSynchronizationContext());
1✔
694

695
    if (overrideAuthority == null) {
1✔
696
      return usedNameResolver;
1✔
697
    }
698

699
    return new ForwardingNameResolver(usedNameResolver) {
1✔
700
      @Override
701
      public String getServiceAuthority() {
702
        return overrideAuthority;
1✔
703
      }
704
    };
705
  }
706

707
  @VisibleForTesting
708
  InternalConfigSelector getConfigSelector() {
709
    return realChannel.configSelector.get();
1✔
710
  }
711
  
712
  @VisibleForTesting
713
  boolean hasThrottle() {
714
    return this.transportProvider.throttle != null;
1✔
715
  }
716

717
  /**
718
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
719
   * cancelled.
720
   */
721
  @Override
722
  public ManagedChannelImpl shutdown() {
723
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
724
    if (!shutdown.compareAndSet(false, true)) {
1✔
725
      return this;
1✔
726
    }
727
    final class Shutdown implements Runnable {
1✔
728
      @Override
729
      public void run() {
730
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
731
        channelStateManager.gotoState(SHUTDOWN);
1✔
732
      }
1✔
733
    }
734

735
    syncContext.execute(new Shutdown());
1✔
736
    realChannel.shutdown();
1✔
737
    final class CancelIdleTimer implements Runnable {
1✔
738
      @Override
739
      public void run() {
740
        cancelIdleTimer(/* permanent= */ true);
1✔
741
      }
1✔
742
    }
743

744
    syncContext.execute(new CancelIdleTimer());
1✔
745
    return this;
1✔
746
  }
747

748
  /**
749
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
750
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
751
   * return {@code false} immediately after this method returns.
752
   */
753
  @Override
754
  public ManagedChannelImpl shutdownNow() {
755
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
756
    shutdown();
1✔
757
    realChannel.shutdownNow();
1✔
758
    final class ShutdownNow implements Runnable {
1✔
759
      @Override
760
      public void run() {
761
        if (shutdownNowed) {
1✔
762
          return;
1✔
763
        }
764
        shutdownNowed = true;
1✔
765
        maybeShutdownNowSubchannels();
1✔
766
      }
1✔
767
    }
768

769
    syncContext.execute(new ShutdownNow());
1✔
770
    return this;
1✔
771
  }
772

773
  // Called from syncContext
774
  @VisibleForTesting
775
  void panic(final Throwable t) {
776
    if (panicMode) {
1✔
777
      // Preserve the first panic information
778
      return;
×
779
    }
780
    panicMode = true;
1✔
781
    try {
782
      cancelIdleTimer(/* permanent= */ true);
1✔
783
      shutdownNameResolverAndLoadBalancer(false);
1✔
784
    } finally {
785
      updateSubchannelPicker(new LoadBalancer.FixedResultPicker(PickResult.withDrop(
1✔
786
          Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t))));
1✔
787
      realChannel.updateConfigSelector(null);
1✔
788
      channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
789
      channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
790
    }
791
  }
1✔
792

793
  @VisibleForTesting
794
  boolean isInPanicMode() {
795
    return panicMode;
1✔
796
  }
797

798
  // Called from syncContext
799
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
800
    delayedTransport.reprocess(newPicker);
1✔
801
  }
1✔
802

803
  @Override
804
  public boolean isShutdown() {
805
    return shutdown.get();
1✔
806
  }
807

808
  @Override
809
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
810
    return terminatedLatch.await(timeout, unit);
1✔
811
  }
812

813
  @Override
814
  public boolean isTerminated() {
815
    return terminated;
1✔
816
  }
817

818
  /*
819
   * Creates a new outgoing call on the channel.
820
   */
821
  @Override
822
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
823
      CallOptions callOptions) {
824
    return interceptorChannel.newCall(method, callOptions);
1✔
825
  }
826

827
  @Override
828
  public String authority() {
829
    return interceptorChannel.authority();
1✔
830
  }
831

832
  private Executor getCallExecutor(CallOptions callOptions) {
833
    Executor executor = callOptions.getExecutor();
1✔
834
    if (executor == null) {
1✔
835
      executor = this.executor;
1✔
836
    }
837
    return executor;
1✔
838
  }
839

840
  private class RealChannel extends Channel {
841
    // Reference to null if no config selector is available from resolution result
842
    // Reference must be set() from syncContext
843
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
844
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
845
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
846
    // same target, the new instance must have the same value.
847
    private final String authority;
848

849
    private final Channel clientCallImplChannel = new Channel() {
1✔
850
      @Override
851
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
852
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
853
        return new ClientCallImpl<>(
1✔
854
            method,
855
            getCallExecutor(callOptions),
1✔
856
            callOptions,
857
            transportProvider,
1✔
858
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
859
            channelCallTracer,
1✔
860
            null)
861
            .setFullStreamDecompression(fullStreamDecompression)
1✔
862
            .setDecompressorRegistry(decompressorRegistry)
1✔
863
            .setCompressorRegistry(compressorRegistry);
1✔
864
      }
865

866
      @Override
867
      public String authority() {
868
        return authority;
×
869
      }
870
    };
871

872
    private RealChannel(String authority) {
1✔
873
      this.authority =  checkNotNull(authority, "authority");
1✔
874
    }
1✔
875

876
    @Override
877
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
878
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
879
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
880
        return newClientCall(method, callOptions);
1✔
881
      }
882
      syncContext.execute(new Runnable() {
1✔
883
        @Override
884
        public void run() {
885
          exitIdleMode();
1✔
886
        }
1✔
887
      });
888
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
889
        // This is an optimization for the case (typically with InProcessTransport) when name
890
        // resolution result is immediately available at this point. Otherwise, some users'
891
        // tests might observe slight behavior difference from earlier grpc versions.
892
        return newClientCall(method, callOptions);
1✔
893
      }
894
      if (shutdown.get()) {
1✔
895
        // Return a failing ClientCall.
896
        return new ClientCall<ReqT, RespT>() {
×
897
          @Override
898
          public void start(Listener<RespT> responseListener, Metadata headers) {
899
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
900
          }
×
901

902
          @Override public void request(int numMessages) {}
×
903

904
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
905

906
          @Override public void halfClose() {}
×
907

908
          @Override public void sendMessage(ReqT message) {}
×
909
        };
910
      }
911
      Context context = Context.current();
1✔
912
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
913
      syncContext.execute(new Runnable() {
1✔
914
        @Override
915
        public void run() {
916
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
917
            if (pendingCalls == null) {
1✔
918
              pendingCalls = new LinkedHashSet<>();
1✔
919
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
920
            }
921
            pendingCalls.add(pendingCall);
1✔
922
          } else {
923
            pendingCall.reprocess();
1✔
924
          }
925
        }
1✔
926
      });
927
      return pendingCall;
1✔
928
    }
929

930
    // Must run in SynchronizationContext.
931
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
932
      InternalConfigSelector prevConfig = configSelector.get();
1✔
933
      configSelector.set(config);
1✔
934
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
935
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
936
          pendingCall.reprocess();
1✔
937
        }
1✔
938
      }
939
    }
1✔
940

941
    // Must run in SynchronizationContext.
942
    void onConfigError() {
943
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
944
        // Apply Default Service Config if initial name resolution fails.
945
        if (defaultServiceConfig != null) {
1✔
946
          updateConfigSelector(defaultServiceConfig.getDefaultConfigSelector());
1✔
947
          lastServiceConfig = defaultServiceConfig;
1✔
948
          channelLogger.log(ChannelLogLevel.ERROR,
1✔
949
              "Initial Name Resolution error, using default service config");
950
        } else {
951
          updateConfigSelector(null);
1✔
952
        }
953
      }
954
    }
1✔
955

956
    void shutdown() {
957
      final class RealChannelShutdown implements Runnable {
1✔
958
        @Override
959
        public void run() {
960
          if (pendingCalls == null) {
1✔
961
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
962
              configSelector.set(null);
1✔
963
            }
964
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
965
          }
966
        }
1✔
967
      }
968

969
      syncContext.execute(new RealChannelShutdown());
1✔
970
    }
1✔
971

972
    void shutdownNow() {
973
      final class RealChannelShutdownNow implements Runnable {
1✔
974
        @Override
975
        public void run() {
976
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
977
            configSelector.set(null);
1✔
978
          }
979
          if (pendingCalls != null) {
1✔
980
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
981
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
982
            }
1✔
983
          }
984
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
985
        }
1✔
986
      }
987

988
      syncContext.execute(new RealChannelShutdownNow());
1✔
989
    }
1✔
990

991
    @Override
992
    public String authority() {
993
      return authority;
1✔
994
    }
995

996
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
997
      final Context context;
998
      final MethodDescriptor<ReqT, RespT> method;
999
      final CallOptions callOptions;
1000
      private final long callCreationTime;
1001

1002
      PendingCall(
1003
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1004
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1005
        this.context = context;
1✔
1006
        this.method = method;
1✔
1007
        this.callOptions = callOptions;
1✔
1008
        this.callCreationTime = ticker.nanoTime();
1✔
1009
      }
1✔
1010

1011
      /** Called when it's ready to create a real call and reprocess the pending call. */
1012
      void reprocess() {
1013
        ClientCall<ReqT, RespT> realCall;
1014
        Context previous = context.attach();
1✔
1015
        try {
1016
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1017
              ticker.nanoTime() - callCreationTime);
1✔
1018
          realCall = newClientCall(method, delayResolutionOption);
1✔
1019
        } finally {
1020
          context.detach(previous);
1✔
1021
        }
1022
        Runnable toRun = setCall(realCall);
1✔
1023
        if (toRun == null) {
1✔
1024
          syncContext.execute(new PendingCallRemoval());
1✔
1025
        } else {
1026
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1027
            @Override
1028
            public void run() {
1029
              toRun.run();
1✔
1030
              syncContext.execute(new PendingCallRemoval());
1✔
1031
            }
1✔
1032
          });
1033
        }
1034
      }
1✔
1035

1036
      @Override
1037
      protected void callCancelled() {
1038
        super.callCancelled();
1✔
1039
        syncContext.execute(new PendingCallRemoval());
1✔
1040
      }
1✔
1041

1042
      final class PendingCallRemoval implements Runnable {
1✔
1043
        @Override
1044
        public void run() {
1045
          if (pendingCalls != null) {
1✔
1046
            pendingCalls.remove(PendingCall.this);
1✔
1047
            if (pendingCalls.isEmpty()) {
1✔
1048
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1049
              pendingCalls = null;
1✔
1050
              if (shutdown.get()) {
1✔
1051
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1052
              }
1053
            }
1054
          }
1055
        }
1✔
1056
      }
1057
    }
1058

1059
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1060
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1061
      InternalConfigSelector selector = configSelector.get();
1✔
1062
      if (selector == null) {
1✔
1063
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1064
      }
1065
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1066
        MethodInfo methodInfo =
1✔
1067
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1068
        if (methodInfo != null) {
1✔
1069
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1070
        }
1071
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1072
      }
1073
      return new ConfigSelectingClientCall<>(
1✔
1074
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1075
    }
1076
  }
1077

1078
  /**
1079
   * A client call for a given channel that applies a given config selector when it starts.
1080
   */
1081
  static final class ConfigSelectingClientCall<ReqT, RespT>
1082
      extends ForwardingClientCall<ReqT, RespT> {
1083

1084
    private final InternalConfigSelector configSelector;
1085
    private final Channel channel;
1086
    private final Executor callExecutor;
1087
    private final MethodDescriptor<ReqT, RespT> method;
1088
    private final Context context;
1089
    private CallOptions callOptions;
1090

1091
    private ClientCall<ReqT, RespT> delegate;
1092

1093
    ConfigSelectingClientCall(
1094
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1095
        MethodDescriptor<ReqT, RespT> method,
1096
        CallOptions callOptions) {
1✔
1097
      this.configSelector = configSelector;
1✔
1098
      this.channel = channel;
1✔
1099
      this.method = method;
1✔
1100
      this.callExecutor =
1✔
1101
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1102
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1103
      this.context = Context.current();
1✔
1104
    }
1✔
1105

1106
    @Override
1107
    protected ClientCall<ReqT, RespT> delegate() {
1108
      return delegate;
1✔
1109
    }
1110

1111
    @SuppressWarnings("unchecked")
1112
    @Override
1113
    public void start(Listener<RespT> observer, Metadata headers) {
1114
      PickSubchannelArgs args =
1✔
1115
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1116
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1117
      Status status = result.getStatus();
1✔
1118
      if (!status.isOk()) {
1✔
1119
        executeCloseObserverInContext(observer,
1✔
1120
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1121
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1122
        return;
1✔
1123
      }
1124
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1125
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1126
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1127
      if (methodInfo != null) {
1✔
1128
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1129
      }
1130
      if (interceptor != null) {
1✔
1131
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1132
      } else {
1133
        delegate = channel.newCall(method, callOptions);
×
1134
      }
1135
      delegate.start(observer, headers);
1✔
1136
    }
1✔
1137

1138
    private void executeCloseObserverInContext(
1139
        final Listener<RespT> observer, final Status status) {
1140
      class CloseInContext extends ContextRunnable {
1141
        CloseInContext() {
1✔
1142
          super(context);
1✔
1143
        }
1✔
1144

1145
        @Override
1146
        public void runInContext() {
1147
          observer.onClose(status, new Metadata());
1✔
1148
        }
1✔
1149
      }
1150

1151
      callExecutor.execute(new CloseInContext());
1✔
1152
    }
1✔
1153

1154
    @Override
1155
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1156
      if (delegate != null) {
×
1157
        delegate.cancel(message, cause);
×
1158
      }
1159
    }
×
1160
  }
1161

1162
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1163
    @Override
1164
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1165

1166
    @Override
1167
    public void request(int numMessages) {}
1✔
1168

1169
    @Override
1170
    public void cancel(String message, Throwable cause) {}
×
1171

1172
    @Override
1173
    public void halfClose() {}
×
1174

1175
    @Override
1176
    public void sendMessage(Object message) {}
×
1177

1178
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1179
    @Override
1180
    public boolean isReady() {
1181
      return false;
×
1182
    }
1183
  };
1184

1185
  /**
1186
   * Terminate the channel if termination conditions are met.
1187
   */
1188
  // Must be run from syncContext
1189
  private void maybeTerminateChannel() {
1190
    if (terminated) {
1✔
1191
      return;
×
1192
    }
1193
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1194
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1195
      channelz.removeRootChannel(this);
1✔
1196
      executorPool.returnObject(executor);
1✔
1197
      balancerRpcExecutorHolder.release();
1✔
1198
      offloadExecutorHolder.release();
1✔
1199
      // Release the transport factory so that it can deallocate any resources.
1200
      transportFactory.close();
1✔
1201

1202
      terminated = true;
1✔
1203
      terminatedLatch.countDown();
1✔
1204
    }
1205
  }
1✔
1206

1207
  // Must be called from syncContext
1208
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1209
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1210
      refreshNameResolution();
1✔
1211
    }
1212
  }
1✔
1213

1214
  @Override
1215
  @SuppressWarnings("deprecation")
1216
  public ConnectivityState getState(boolean requestConnection) {
1217
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1218
    if (requestConnection && savedChannelState == IDLE) {
1✔
1219
      final class RequestConnection implements Runnable {
1✔
1220
        @Override
1221
        public void run() {
1222
          exitIdleMode();
1✔
1223
          if (lbHelper != null) {
1✔
1224
            lbHelper.lb.requestConnection();
1✔
1225
          }
1226
        }
1✔
1227
      }
1228

1229
      syncContext.execute(new RequestConnection());
1✔
1230
    }
1231
    return savedChannelState;
1✔
1232
  }
1233

1234
  @Override
1235
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1236
    final class NotifyStateChanged implements Runnable {
1✔
1237
      @Override
1238
      public void run() {
1239
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1240
      }
1✔
1241
    }
1242

1243
    syncContext.execute(new NotifyStateChanged());
1✔
1244
  }
1✔
1245

1246
  @Override
1247
  public void resetConnectBackoff() {
1248
    final class ResetConnectBackoff implements Runnable {
1✔
1249
      @Override
1250
      public void run() {
1251
        if (shutdown.get()) {
1✔
1252
          return;
1✔
1253
        }
1254
        if (nameResolverStarted) {
1✔
1255
          refreshNameResolution();
1✔
1256
        }
1257
        for (InternalSubchannel subchannel : subchannels) {
1✔
1258
          subchannel.resetConnectBackoff();
1✔
1259
        }
1✔
1260
        for (OobChannel oobChannel : oobChannels) {
1✔
1261
          oobChannel.resetConnectBackoff();
×
1262
        }
×
1263
      }
1✔
1264
    }
1265

1266
    syncContext.execute(new ResetConnectBackoff());
1✔
1267
  }
1✔
1268

1269
  @Override
1270
  public void enterIdle() {
1271
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1272
      @Override
1273
      public void run() {
1274
        if (shutdown.get() || lbHelper == null) {
1✔
1275
          return;
1✔
1276
        }
1277
        cancelIdleTimer(/* permanent= */ false);
1✔
1278
        enterIdleMode();
1✔
1279
      }
1✔
1280
    }
1281

1282
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1283
  }
1✔
1284

1285
  /**
1286
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1287
   * backoff.
1288
   */
1289
  private final class UncommittedRetriableStreamsRegistry {
1✔
1290
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1291
    // it's worthwhile to look for a lock-free approach.
1292
    final Object lock = new Object();
1✔
1293

1294
    @GuardedBy("lock")
1✔
1295
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1296

1297
    @GuardedBy("lock")
1298
    Status shutdownStatus;
1299

1300
    void onShutdown(Status reason) {
1301
      boolean shouldShutdownDelayedTransport = false;
1✔
1302
      synchronized (lock) {
1✔
1303
        if (shutdownStatus != null) {
1✔
1304
          return;
1✔
1305
        }
1306
        shutdownStatus = reason;
1✔
1307
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1308
        // retriable streams, which may be in backoff and not using any transport, are already
1309
        // started RPCs.
1310
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1311
          shouldShutdownDelayedTransport = true;
1✔
1312
        }
1313
      }
1✔
1314

1315
      if (shouldShutdownDelayedTransport) {
1✔
1316
        delayedTransport.shutdown(reason);
1✔
1317
      }
1318
    }
1✔
1319

1320
    void onShutdownNow(Status reason) {
1321
      onShutdown(reason);
1✔
1322
      Collection<ClientStream> streams;
1323

1324
      synchronized (lock) {
1✔
1325
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1326
      }
1✔
1327

1328
      for (ClientStream stream : streams) {
1✔
1329
        stream.cancel(reason);
1✔
1330
      }
1✔
1331
      delayedTransport.shutdownNow(reason);
1✔
1332
    }
1✔
1333

1334
    /**
1335
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1336
     * shutdown Status.
1337
     */
1338
    @Nullable
1339
    Status add(RetriableStream<?> retriableStream) {
1340
      synchronized (lock) {
1✔
1341
        if (shutdownStatus != null) {
1✔
1342
          return shutdownStatus;
1✔
1343
        }
1344
        uncommittedRetriableStreams.add(retriableStream);
1✔
1345
        return null;
1✔
1346
      }
1347
    }
1348

1349
    void remove(RetriableStream<?> retriableStream) {
1350
      Status shutdownStatusCopy = null;
1✔
1351

1352
      synchronized (lock) {
1✔
1353
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1354
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1355
          shutdownStatusCopy = shutdownStatus;
1✔
1356
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1357
          // hashmap.
1358
          uncommittedRetriableStreams = new HashSet<>();
1✔
1359
        }
1360
      }
1✔
1361

1362
      if (shutdownStatusCopy != null) {
1✔
1363
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1364
      }
1365
    }
1✔
1366
  }
1367

1368
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1369
    AutoConfiguredLoadBalancer lb;
1370

1371
    @Override
1372
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1373
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1374
      // No new subchannel should be created after load balancer has been shutdown.
1375
      checkState(!terminating, "Channel is being terminated");
1✔
1376
      return new SubchannelImpl(args);
1✔
1377
    }
1378

1379
    @Override
1380
    public void updateBalancingState(
1381
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1382
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1383
      checkNotNull(newState, "newState");
1✔
1384
      checkNotNull(newPicker, "newPicker");
1✔
1385

1386
      if (LbHelperImpl.this != lbHelper || panicMode) {
1✔
1387
        return;
1✔
1388
      }
1389
      updateSubchannelPicker(newPicker);
1✔
1390
      // It's not appropriate to report SHUTDOWN state from lb.
1391
      // Ignore the case of newState == SHUTDOWN for now.
1392
      if (newState != SHUTDOWN) {
1✔
1393
        channelLogger.log(
1✔
1394
            ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1395
        channelStateManager.gotoState(newState);
1✔
1396
      }
1397
    }
1✔
1398

1399
    @Override
1400
    public void refreshNameResolution() {
1401
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1402
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1403
        @Override
1404
        public void run() {
1405
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1406
        }
1✔
1407
      }
1408

1409
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1410
    }
1✔
1411

1412
    @Override
1413
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1414
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1415
    }
1416

1417
    @Override
1418
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1419
        String authority) {
1420
      // TODO(ejona): can we be even stricter? Like terminating?
1421
      checkState(!terminated, "Channel is terminated");
1✔
1422
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1423
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1424
      InternalLogId subchannelLogId =
1✔
1425
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1426
      ChannelTracer oobChannelTracer =
1✔
1427
          new ChannelTracer(
1428
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1429
              "OobChannel for " + addressGroup);
1430
      final OobChannel oobChannel = new OobChannel(
1✔
1431
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1432
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1433
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1434
          .setDescription("Child OobChannel created")
1✔
1435
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1436
          .setTimestampNanos(oobChannelCreationTime)
1✔
1437
          .setChannelRef(oobChannel)
1✔
1438
          .build());
1✔
1439
      ChannelTracer subchannelTracer =
1✔
1440
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1441
              "Subchannel for " + addressGroup);
1442
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1443
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1444
        @Override
1445
        void onTerminated(InternalSubchannel is) {
1446
          oobChannels.remove(oobChannel);
1✔
1447
          channelz.removeSubchannel(is);
1✔
1448
          oobChannel.handleSubchannelTerminated();
1✔
1449
          maybeTerminateChannel();
1✔
1450
        }
1✔
1451

1452
        @Override
1453
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1454
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1455
          //  state and refresh name resolution if necessary.
1456
          handleInternalSubchannelState(newState);
1✔
1457
          oobChannel.handleSubchannelStateChange(newState);
1✔
1458
        }
1✔
1459
      }
1460

1461
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1462
          CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
1✔
1463
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1464
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1465
          // All callback methods are run from syncContext
1466
          new ManagedOobChannelCallback(),
1467
          channelz,
1✔
1468
          callTracerFactory.create(),
1✔
1469
          subchannelTracer,
1470
          subchannelLogId,
1471
          subchannelLogger,
1472
          transportFilters);
1✔
1473
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1474
          .setDescription("Child Subchannel created")
1✔
1475
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1476
          .setTimestampNanos(oobChannelCreationTime)
1✔
1477
          .setSubchannelRef(internalSubchannel)
1✔
1478
          .build());
1✔
1479
      channelz.addSubchannel(oobChannel);
1✔
1480
      channelz.addSubchannel(internalSubchannel);
1✔
1481
      oobChannel.setSubchannel(internalSubchannel);
1✔
1482
      final class AddOobChannel implements Runnable {
1✔
1483
        @Override
1484
        public void run() {
1485
          if (terminating) {
1✔
1486
            oobChannel.shutdown();
×
1487
          }
1488
          if (!terminated) {
1✔
1489
            // If channel has not terminated, it will track the subchannel and block termination
1490
            // for it.
1491
            oobChannels.add(oobChannel);
1✔
1492
          }
1493
        }
1✔
1494
      }
1495

1496
      syncContext.execute(new AddOobChannel());
1✔
1497
      return oobChannel;
1✔
1498
    }
1499

1500
    @Deprecated
1501
    @Override
1502
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1503
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1504
          // Override authority to keep the old behavior.
1505
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1506
          .overrideAuthority(getAuthority());
1✔
1507
    }
1508

1509
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1510
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1511
    @Override
1512
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1513
        final String target, final ChannelCredentials channelCreds) {
1514
      checkNotNull(channelCreds, "channelCreds");
1✔
1515

1516
      final class ResolvingOobChannelBuilder
1517
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1518
        final ManagedChannelBuilder<?> delegate;
1519

1520
        ResolvingOobChannelBuilder() {
1✔
1521
          final ClientTransportFactory transportFactory;
1522
          CallCredentials callCredentials;
1523
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1524
            transportFactory = originalTransportFactory;
1✔
1525
            callCredentials = null;
1✔
1526
          } else {
1527
            SwapChannelCredentialsResult swapResult =
1✔
1528
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1529
            if (swapResult == null) {
1✔
1530
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1531
              return;
×
1532
            } else {
1533
              transportFactory = swapResult.transportFactory;
1✔
1534
              callCredentials = swapResult.callCredentials;
1✔
1535
            }
1536
          }
1537
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1538
              new ClientTransportFactoryBuilder() {
1✔
1539
                @Override
1540
                public ClientTransportFactory buildClientTransportFactory() {
1541
                  return transportFactory;
1✔
1542
                }
1543
              };
1544
          delegate = new ManagedChannelImplBuilder(
1✔
1545
              target,
1546
              channelCreds,
1547
              callCredentials,
1548
              transportFactoryBuilder,
1549
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1550
              .nameResolverRegistry(nameResolverRegistry);
1✔
1551
        }
1✔
1552

1553
        @Override
1554
        protected ManagedChannelBuilder<?> delegate() {
1555
          return delegate;
1✔
1556
        }
1557
      }
1558

1559
      checkState(!terminated, "Channel is terminated");
1✔
1560

1561
      @SuppressWarnings("deprecation")
1562
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1563

1564
      return builder
1✔
1565
          // TODO(zdapeng): executors should not outlive the parent channel.
1566
          .executor(executor)
1✔
1567
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1568
          .maxTraceEvents(maxTraceEvents)
1✔
1569
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1570
          .userAgent(userAgent);
1✔
1571
    }
1572

1573
    @Override
1574
    public ChannelCredentials getUnsafeChannelCredentials() {
1575
      if (originalChannelCreds == null) {
1✔
1576
        return new DefaultChannelCreds();
1✔
1577
      }
1578
      return originalChannelCreds;
×
1579
    }
1580

1581
    @Override
1582
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1583
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1584
    }
×
1585

1586
    @Override
1587
    public void updateOobChannelAddresses(ManagedChannel channel,
1588
        List<EquivalentAddressGroup> eag) {
1589
      checkArgument(channel instanceof OobChannel,
1✔
1590
          "channel must have been returned from createOobChannel");
1591
      ((OobChannel) channel).updateAddresses(eag);
1✔
1592
    }
1✔
1593

1594
    @Override
1595
    public String getAuthority() {
1596
      return ManagedChannelImpl.this.authority();
1✔
1597
    }
1598

1599
    @Override
1600
    public String getChannelTarget() {
1601
      return targetUri.toString();
1✔
1602
    }
1603

1604
    @Override
1605
    public SynchronizationContext getSynchronizationContext() {
1606
      return syncContext;
1✔
1607
    }
1608

1609
    @Override
1610
    public ScheduledExecutorService getScheduledExecutorService() {
1611
      return scheduledExecutor;
1✔
1612
    }
1613

1614
    @Override
1615
    public ChannelLogger getChannelLogger() {
1616
      return channelLogger;
1✔
1617
    }
1618

1619
    @Override
1620
    public NameResolver.Args getNameResolverArgs() {
1621
      return nameResolverArgs;
1✔
1622
    }
1623

1624
    @Override
1625
    public NameResolverRegistry getNameResolverRegistry() {
1626
      return nameResolverRegistry;
1✔
1627
    }
1628

1629
    @Override
1630
    public MetricRecorder getMetricRecorder() {
1631
      return metricRecorder;
1✔
1632
    }
1633

1634
    /**
1635
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1636
     */
1637
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1638
    //     channel creds.
1639
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1640
      @Override
1641
      public ChannelCredentials withoutBearerTokens() {
1642
        return this;
×
1643
      }
1644
    }
1645
  }
1646

1647
  final class NameResolverListener extends NameResolver.Listener2 {
1648
    final LbHelperImpl helper;
1649
    final NameResolver resolver;
1650

1651
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1652
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1653
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1654
    }
1✔
1655

1656
    @Override
1657
    public void onResult(final ResolutionResult resolutionResult) {
1658
      final class NamesResolved implements Runnable {
1✔
1659

1660
        @Override
1661
        public void run() {
1662
          Status status = onResult2(resolutionResult);
1✔
1663
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1664
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1665
          resolutionResultListener.resolutionAttempted(status);
1✔
1666
        }
1✔
1667
      }
1668

1669
      syncContext.execute(new NamesResolved());
1✔
1670
    }
1✔
1671

1672
    @SuppressWarnings("ReferenceEquality")
1673
    @Override
1674
    public Status onResult2(final ResolutionResult resolutionResult) {
1675
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1676
      if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1677
        return Status.OK;
1✔
1678
      }
1679

1680
      StatusOr<List<EquivalentAddressGroup>> serversOrError =
1✔
1681
          resolutionResult.getAddressesOrError();
1✔
1682
      if (!serversOrError.hasValue()) {
1✔
1683
        handleErrorInSyncContext(serversOrError.getStatus());
1✔
1684
        return serversOrError.getStatus();
1✔
1685
      }
1686
      List<EquivalentAddressGroup> servers = serversOrError.getValue();
1✔
1687
      channelLogger.log(
1✔
1688
          ChannelLogLevel.DEBUG,
1689
          "Resolved address: {0}, config={1}",
1690
          servers,
1691
          resolutionResult.getAttributes());
1✔
1692

1693
      if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1694
        channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}",
1✔
1695
            servers);
1696
        lastResolutionState = ResolutionState.SUCCESS;
1✔
1697
      }
1698
      ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1699
      InternalConfigSelector resolvedConfigSelector =
1✔
1700
          resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1701
      ManagedChannelServiceConfig validServiceConfig =
1702
          configOrError != null && configOrError.getConfig() != null
1✔
1703
              ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1704
              : null;
1✔
1705
      Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1706

1707
      ManagedChannelServiceConfig effectiveServiceConfig;
1708
      if (!lookUpServiceConfig) {
1✔
1709
        if (validServiceConfig != null) {
1✔
1710
          channelLogger.log(
1✔
1711
              ChannelLogLevel.INFO,
1712
              "Service config from name resolver discarded by channel settings");
1713
        }
1714
        effectiveServiceConfig =
1715
            defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1716
        if (resolvedConfigSelector != null) {
1✔
1717
          channelLogger.log(
1✔
1718
              ChannelLogLevel.INFO,
1719
              "Config selector from name resolver discarded by channel settings");
1720
        }
1721
        realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1722
      } else {
1723
        // Try to use config if returned from name resolver
1724
        // Otherwise, try to use the default config if available
1725
        if (validServiceConfig != null) {
1✔
1726
          effectiveServiceConfig = validServiceConfig;
1✔
1727
          if (resolvedConfigSelector != null) {
1✔
1728
            realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1729
            if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1730
              channelLogger.log(
×
1731
                  ChannelLogLevel.DEBUG,
1732
                  "Method configs in service config will be discarded due to presence of"
1733
                      + "config-selector");
1734
            }
1735
          } else {
1736
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1737
          }
1738
        } else if (defaultServiceConfig != null) {
1✔
1739
          effectiveServiceConfig = defaultServiceConfig;
1✔
1740
          realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1741
          channelLogger.log(
1✔
1742
              ChannelLogLevel.INFO,
1743
              "Received no service config, using default service config");
1744
        } else if (serviceConfigError != null) {
1✔
1745
          if (!serviceConfigUpdated) {
1✔
1746
            // First DNS lookup has invalid service config, and cannot fall back to default
1747
            channelLogger.log(
1✔
1748
                ChannelLogLevel.INFO,
1749
                "Fallback to error due to invalid first service config without default config");
1750
            // This error could be an "inappropriate" control plane error that should not bleed
1751
            // through to client code using gRPC. We let them flow through here to the LB as
1752
            // we later check for these error codes when investigating pick results in
1753
            // GrpcUtil.getTransportFromPickResult().
1754
            onError(configOrError.getError());
1✔
1755
            return configOrError.getError();
1✔
1756
          } else {
1757
            effectiveServiceConfig = lastServiceConfig;
1✔
1758
          }
1759
        } else {
1760
          effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1761
          realChannel.updateConfigSelector(null);
1✔
1762
        }
1763
        if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1764
          channelLogger.log(
1✔
1765
              ChannelLogLevel.INFO,
1766
              "Service config changed{0}",
1767
              effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1768
          lastServiceConfig = effectiveServiceConfig;
1✔
1769
          transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1770
        }
1771

1772
        try {
1773
          // TODO(creamsoup): when `serversOrError` is empty and lastResolutionStateCopy == SUCCESS
1774
          //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1775
          //  lbNeedAddress is not deterministic
1776
          serviceConfigUpdated = true;
1✔
1777
        } catch (RuntimeException re) {
×
1778
          logger.log(
×
1779
              Level.WARNING,
1780
              "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1781
              re);
1782
        }
1✔
1783
      }
1784

1785
      Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1786
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1787
      if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1788
        Attributes.Builder attrBuilder =
1✔
1789
            effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1790
        Map<String, ?> healthCheckingConfig =
1✔
1791
            effectiveServiceConfig.getHealthCheckingConfig();
1✔
1792
        if (healthCheckingConfig != null) {
1✔
1793
          attrBuilder
1✔
1794
              .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1795
              .build();
1✔
1796
        }
1797
        Attributes attributes = attrBuilder.build();
1✔
1798

1799
        ResolvedAddresses.Builder resolvedAddresses = ResolvedAddresses.newBuilder()
1✔
1800
            .setAddresses(serversOrError.getValue())
1✔
1801
            .setAttributes(attributes)
1✔
1802
            .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig());
1✔
1803
        Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1804
            resolvedAddresses.build());
1✔
1805
        return addressAcceptanceStatus;
1✔
1806
      }
1807
      return Status.OK;
×
1808
    }
1809

1810
    @Override
1811
    public void onError(final Status error) {
1812
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1813
      final class NameResolverErrorHandler implements Runnable {
1✔
1814
        @Override
1815
        public void run() {
1816
          handleErrorInSyncContext(error);
1✔
1817
        }
1✔
1818
      }
1819

1820
      syncContext.execute(new NameResolverErrorHandler());
1✔
1821
    }
1✔
1822

1823
    private void handleErrorInSyncContext(Status error) {
1824
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1825
          new Object[] {getLogId(), error});
1✔
1826
      realChannel.onConfigError();
1✔
1827
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1828
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1829
        lastResolutionState = ResolutionState.ERROR;
1✔
1830
      }
1831
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1832
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1833
        return;
1✔
1834
      }
1835

1836
      helper.lb.handleNameResolutionError(error);
1✔
1837
    }
1✔
1838
  }
1839

1840
  private final class SubchannelImpl extends AbstractSubchannel {
1841
    final CreateSubchannelArgs args;
1842
    final InternalLogId subchannelLogId;
1843
    final ChannelLoggerImpl subchannelLogger;
1844
    final ChannelTracer subchannelTracer;
1845
    List<EquivalentAddressGroup> addressGroups;
1846
    InternalSubchannel subchannel;
1847
    boolean started;
1848
    boolean shutdown;
1849
    ScheduledHandle delayedShutdownTask;
1850

1851
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1852
      checkNotNull(args, "args");
1✔
1853
      addressGroups = args.getAddresses();
1✔
1854
      if (authorityOverride != null) {
1✔
1855
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1856
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1857
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1858
      }
1859
      this.args = args;
1✔
1860
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1861
      subchannelTracer = new ChannelTracer(
1✔
1862
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1863
          "Subchannel for " + args.getAddresses());
1✔
1864
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1865
    }
1✔
1866

1867
    @Override
1868
    public void start(final SubchannelStateListener listener) {
1869
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1870
      checkState(!started, "already started");
1✔
1871
      checkState(!shutdown, "already shutdown");
1✔
1872
      checkState(!terminating, "Channel is being terminated");
1✔
1873
      started = true;
1✔
1874
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1875
        // All callbacks are run in syncContext
1876
        @Override
1877
        void onTerminated(InternalSubchannel is) {
1878
          subchannels.remove(is);
1✔
1879
          channelz.removeSubchannel(is);
1✔
1880
          maybeTerminateChannel();
1✔
1881
        }
1✔
1882

1883
        @Override
1884
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1885
          checkState(listener != null, "listener is null");
1✔
1886
          listener.onSubchannelState(newState);
1✔
1887
        }
1✔
1888

1889
        @Override
1890
        void onInUse(InternalSubchannel is) {
1891
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1892
        }
1✔
1893

1894
        @Override
1895
        void onNotInUse(InternalSubchannel is) {
1896
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1897
        }
1✔
1898
      }
1899

1900
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1901
          args,
1902
          authority(),
1✔
1903
          userAgent,
1✔
1904
          backoffPolicyProvider,
1✔
1905
          transportFactory,
1✔
1906
          transportFactory.getScheduledExecutorService(),
1✔
1907
          stopwatchSupplier,
1✔
1908
          syncContext,
1909
          new ManagedInternalSubchannelCallback(),
1910
          channelz,
1✔
1911
          callTracerFactory.create(),
1✔
1912
          subchannelTracer,
1913
          subchannelLogId,
1914
          subchannelLogger,
1915
          transportFilters);
1✔
1916

1917
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1918
          .setDescription("Child Subchannel started")
1✔
1919
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1920
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1921
          .setSubchannelRef(internalSubchannel)
1✔
1922
          .build());
1✔
1923

1924
      this.subchannel = internalSubchannel;
1✔
1925
      channelz.addSubchannel(internalSubchannel);
1✔
1926
      subchannels.add(internalSubchannel);
1✔
1927
    }
1✔
1928

1929
    @Override
1930
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1931
      checkState(started, "not started");
1✔
1932
      return subchannel;
1✔
1933
    }
1934

1935
    @Override
1936
    public void shutdown() {
1937
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1938
      if (subchannel == null) {
1✔
1939
        // start() was not successful
1940
        shutdown = true;
×
1941
        return;
×
1942
      }
1943
      if (shutdown) {
1✔
1944
        if (terminating && delayedShutdownTask != null) {
1✔
1945
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1946
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1947
          delayedShutdownTask.cancel();
×
1948
          delayedShutdownTask = null;
×
1949
          // Will fall through to the subchannel.shutdown() at the end.
1950
        } else {
1951
          return;
1✔
1952
        }
1953
      } else {
1954
        shutdown = true;
1✔
1955
      }
1956
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1957
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1958
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1959
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1960
      // shutdown of Subchannel for a few seconds here.
1961
      //
1962
      // TODO(zhangkun83): consider a better approach
1963
      // (https://github.com/grpc/grpc-java/issues/2562).
1964
      if (!terminating) {
1✔
1965
        final class ShutdownSubchannel implements Runnable {
1✔
1966
          @Override
1967
          public void run() {
1968
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
1969
          }
1✔
1970
        }
1971

1972
        delayedShutdownTask = syncContext.schedule(
1✔
1973
            new LogExceptionRunnable(new ShutdownSubchannel()),
1974
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
1975
            transportFactory.getScheduledExecutorService());
1✔
1976
        return;
1✔
1977
      }
1978
      // When terminating == true, no more real streams will be created. It's safe and also
1979
      // desirable to shutdown timely.
1980
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
1981
    }
1✔
1982

1983
    @Override
1984
    public void requestConnection() {
1985
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1986
      checkState(started, "not started");
1✔
1987
      subchannel.obtainActiveTransport();
1✔
1988
    }
1✔
1989

1990
    @Override
1991
    public List<EquivalentAddressGroup> getAllAddresses() {
1992
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1993
      checkState(started, "not started");
1✔
1994
      return addressGroups;
1✔
1995
    }
1996

1997
    @Override
1998
    public Attributes getAttributes() {
1999
      return args.getAttributes();
1✔
2000
    }
2001

2002
    @Override
2003
    public String toString() {
2004
      return subchannelLogId.toString();
1✔
2005
    }
2006

2007
    @Override
2008
    public Channel asChannel() {
2009
      checkState(started, "not started");
1✔
2010
      return new SubchannelChannel(
1✔
2011
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2012
          transportFactory.getScheduledExecutorService(),
1✔
2013
          callTracerFactory.create(),
1✔
2014
          new AtomicReference<InternalConfigSelector>(null));
2015
    }
2016

2017
    @Override
2018
    public Object getInternalSubchannel() {
2019
      checkState(started, "Subchannel is not started");
1✔
2020
      return subchannel;
1✔
2021
    }
2022

2023
    @Override
2024
    public ChannelLogger getChannelLogger() {
2025
      return subchannelLogger;
1✔
2026
    }
2027

2028
    @Override
2029
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2030
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2031
      addressGroups = addrs;
1✔
2032
      if (authorityOverride != null) {
1✔
2033
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2034
      }
2035
      subchannel.updateAddresses(addrs);
1✔
2036
    }
1✔
2037

2038
    @Override
2039
    public Attributes getConnectedAddressAttributes() {
2040
      return subchannel.getConnectedAddressAttributes();
1✔
2041
    }
2042

2043
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2044
        List<EquivalentAddressGroup> eags) {
2045
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2046
      for (EquivalentAddressGroup eag : eags) {
1✔
2047
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2048
            eag.getAddresses(),
1✔
2049
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2050
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2051
      }
1✔
2052
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2053
    }
2054
  }
2055

2056
  @Override
2057
  public String toString() {
2058
    return MoreObjects.toStringHelper(this)
1✔
2059
        .add("logId", logId.getId())
1✔
2060
        .add("target", target)
1✔
2061
        .toString();
1✔
2062
  }
2063

2064
  /**
2065
   * Called from syncContext.
2066
   */
2067
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2068
    @Override
2069
    public void transportShutdown(Status s) {
2070
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2071
    }
1✔
2072

2073
    @Override
2074
    public void transportReady() {
2075
      // Don't care
2076
    }
×
2077

2078
    @Override
2079
    public Attributes filterTransport(Attributes attributes) {
2080
      return attributes;
×
2081
    }
2082

2083
    @Override
2084
    public void transportInUse(final boolean inUse) {
2085
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2086
      if (inUse) {
1✔
2087
        // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
2088
        // subchannels is in use. But we should never be in idle mode when delayed transport is in
2089
        // use.
2090
        exitIdleMode();
1✔
2091
      }
2092
    }
1✔
2093

2094
    @Override
2095
    public void transportTerminated() {
2096
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2097
      terminating = true;
1✔
2098
      shutdownNameResolverAndLoadBalancer(false);
1✔
2099
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2100
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2101
      // here.
2102
      maybeShutdownNowSubchannels();
1✔
2103
      maybeTerminateChannel();
1✔
2104
    }
1✔
2105
  }
2106

2107
  /**
2108
   * Must be accessed from syncContext.
2109
   */
2110
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2111
    @Override
2112
    protected void handleInUse() {
2113
      exitIdleMode();
1✔
2114
    }
1✔
2115

2116
    @Override
2117
    protected void handleNotInUse() {
2118
      if (shutdown.get()) {
1✔
2119
        return;
1✔
2120
      }
2121
      rescheduleIdleTimer();
1✔
2122
    }
1✔
2123
  }
2124

2125
  /**
2126
   * Lazily request for Executor from an executor pool.
2127
   * Also act as an Executor directly to simply run a cmd
2128
   */
2129
  @VisibleForTesting
2130
  static final class ExecutorHolder implements Executor {
2131
    private final ObjectPool<? extends Executor> pool;
2132
    private Executor executor;
2133

2134
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2135
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2136
    }
1✔
2137

2138
    synchronized Executor getExecutor() {
2139
      if (executor == null) {
1✔
2140
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2141
      }
2142
      return executor;
1✔
2143
    }
2144

2145
    synchronized void release() {
2146
      if (executor != null) {
1✔
2147
        executor = pool.returnObject(executor);
1✔
2148
      }
2149
    }
1✔
2150

2151
    @Override
2152
    public void execute(Runnable command) {
2153
      getExecutor().execute(command);
1✔
2154
    }
1✔
2155
  }
2156

2157
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2158
    final ScheduledExecutorService delegate;
2159

2160
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2161
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2162
    }
1✔
2163

2164
    @Override
2165
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2166
      return delegate.schedule(callable, delay, unit);
×
2167
    }
2168

2169
    @Override
2170
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2171
      return delegate.schedule(cmd, delay, unit);
1✔
2172
    }
2173

2174
    @Override
2175
    public ScheduledFuture<?> scheduleAtFixedRate(
2176
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2177
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
1✔
2178
    }
2179

2180
    @Override
2181
    public ScheduledFuture<?> scheduleWithFixedDelay(
2182
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2183
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2184
    }
2185

2186
    @Override
2187
    public boolean awaitTermination(long timeout, TimeUnit unit)
2188
        throws InterruptedException {
2189
      return delegate.awaitTermination(timeout, unit);
×
2190
    }
2191

2192
    @Override
2193
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2194
        throws InterruptedException {
2195
      return delegate.invokeAll(tasks);
×
2196
    }
2197

2198
    @Override
2199
    public <T> List<Future<T>> invokeAll(
2200
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2201
        throws InterruptedException {
2202
      return delegate.invokeAll(tasks, timeout, unit);
×
2203
    }
2204

2205
    @Override
2206
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2207
        throws InterruptedException, ExecutionException {
2208
      return delegate.invokeAny(tasks);
×
2209
    }
2210

2211
    @Override
2212
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2213
        throws InterruptedException, ExecutionException, TimeoutException {
2214
      return delegate.invokeAny(tasks, timeout, unit);
×
2215
    }
2216

2217
    @Override
2218
    public boolean isShutdown() {
2219
      return delegate.isShutdown();
×
2220
    }
2221

2222
    @Override
2223
    public boolean isTerminated() {
2224
      return delegate.isTerminated();
×
2225
    }
2226

2227
    @Override
2228
    public void shutdown() {
2229
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2230
    }
2231

2232
    @Override
2233
    public List<Runnable> shutdownNow() {
2234
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2235
    }
2236

2237
    @Override
2238
    public <T> Future<T> submit(Callable<T> task) {
2239
      return delegate.submit(task);
×
2240
    }
2241

2242
    @Override
2243
    public Future<?> submit(Runnable task) {
2244
      return delegate.submit(task);
×
2245
    }
2246

2247
    @Override
2248
    public <T> Future<T> submit(Runnable task, T result) {
2249
      return delegate.submit(task, result);
×
2250
    }
2251

2252
    @Override
2253
    public void execute(Runnable command) {
2254
      delegate.execute(command);
×
2255
    }
×
2256
  }
2257

2258
  /**
2259
   * A ResolutionState indicates the status of last name resolution.
2260
   */
2261
  enum ResolutionState {
1✔
2262
    NO_RESOLUTION,
1✔
2263
    SUCCESS,
1✔
2264
    ERROR
1✔
2265
  }
2266
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc