• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19497

07 Oct 2024 12:25PM UTC coverage: 84.656% (+0.06%) from 84.599%
#19497

push

github

web-flow
On result2 resolution result have addresses or error (#11330)

Combined success / error status passed via ResolutionResult to the NameResolver.Listener2 interface's onResult2 method - Addresses in the success case or address resolution error in the failure case now get set in ResolutionResult::addressesOrError by the internal name resolvers.

33771 of 39892 relevant lines covered (84.66%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.74
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import io.grpc.Attributes;
36
import io.grpc.CallCredentials;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ChannelCredentials;
40
import io.grpc.ChannelLogger;
41
import io.grpc.ChannelLogger.ChannelLogLevel;
42
import io.grpc.ClientCall;
43
import io.grpc.ClientInterceptor;
44
import io.grpc.ClientInterceptors;
45
import io.grpc.ClientStreamTracer;
46
import io.grpc.ClientTransportFilter;
47
import io.grpc.CompressorRegistry;
48
import io.grpc.ConnectivityState;
49
import io.grpc.ConnectivityStateInfo;
50
import io.grpc.Context;
51
import io.grpc.Deadline;
52
import io.grpc.DecompressorRegistry;
53
import io.grpc.EquivalentAddressGroup;
54
import io.grpc.ForwardingChannelBuilder2;
55
import io.grpc.ForwardingClientCall;
56
import io.grpc.Grpc;
57
import io.grpc.InternalChannelz;
58
import io.grpc.InternalChannelz.ChannelStats;
59
import io.grpc.InternalChannelz.ChannelTrace;
60
import io.grpc.InternalConfigSelector;
61
import io.grpc.InternalInstrumented;
62
import io.grpc.InternalLogId;
63
import io.grpc.InternalWithLogId;
64
import io.grpc.LoadBalancer;
65
import io.grpc.LoadBalancer.CreateSubchannelArgs;
66
import io.grpc.LoadBalancer.PickResult;
67
import io.grpc.LoadBalancer.PickSubchannelArgs;
68
import io.grpc.LoadBalancer.ResolvedAddresses;
69
import io.grpc.LoadBalancer.SubchannelPicker;
70
import io.grpc.LoadBalancer.SubchannelStateListener;
71
import io.grpc.ManagedChannel;
72
import io.grpc.ManagedChannelBuilder;
73
import io.grpc.Metadata;
74
import io.grpc.MethodDescriptor;
75
import io.grpc.MetricInstrumentRegistry;
76
import io.grpc.MetricRecorder;
77
import io.grpc.NameResolver;
78
import io.grpc.NameResolver.ConfigOrError;
79
import io.grpc.NameResolver.ResolutionResult;
80
import io.grpc.NameResolverProvider;
81
import io.grpc.NameResolverRegistry;
82
import io.grpc.ProxyDetector;
83
import io.grpc.Status;
84
import io.grpc.StatusOr;
85
import io.grpc.SynchronizationContext;
86
import io.grpc.SynchronizationContext.ScheduledHandle;
87
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
88
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
89
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
90
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
91
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
92
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
93
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
94
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
95
import io.grpc.internal.RetriableStream.Throttle;
96
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
97
import java.net.URI;
98
import java.util.ArrayList;
99
import java.util.Collection;
100
import java.util.Collections;
101
import java.util.HashSet;
102
import java.util.LinkedHashSet;
103
import java.util.List;
104
import java.util.Map;
105
import java.util.Set;
106
import java.util.concurrent.Callable;
107
import java.util.concurrent.CountDownLatch;
108
import java.util.concurrent.ExecutionException;
109
import java.util.concurrent.Executor;
110
import java.util.concurrent.Future;
111
import java.util.concurrent.ScheduledExecutorService;
112
import java.util.concurrent.ScheduledFuture;
113
import java.util.concurrent.TimeUnit;
114
import java.util.concurrent.TimeoutException;
115
import java.util.concurrent.atomic.AtomicBoolean;
116
import java.util.concurrent.atomic.AtomicReference;
117
import java.util.logging.Level;
118
import java.util.logging.Logger;
119
import javax.annotation.Nullable;
120
import javax.annotation.concurrent.GuardedBy;
121
import javax.annotation.concurrent.ThreadSafe;
122

123
/** A communication channel for making outgoing RPCs. */
124
@ThreadSafe
125
final class ManagedChannelImpl extends ManagedChannel implements
126
    InternalInstrumented<ChannelStats> {
127
  @VisibleForTesting
128
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
129

130
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
131

132
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
133

134
  @VisibleForTesting
135
  static final Status SHUTDOWN_NOW_STATUS =
1✔
136
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
137

138
  @VisibleForTesting
139
  static final Status SHUTDOWN_STATUS =
1✔
140
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
141

142
  @VisibleForTesting
143
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
144
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
145

146
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
147
      ManagedChannelServiceConfig.empty();
1✔
148
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
149
      new InternalConfigSelector() {
1✔
150
        @Override
151
        public Result selectConfig(PickSubchannelArgs args) {
152
          throw new IllegalStateException("Resolution is pending");
×
153
        }
154
      };
155
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
156
      new LoadBalancer.PickDetailsConsumer() {};
1✔
157

158
  private final InternalLogId logId;
159
  private final String target;
160
  @Nullable
161
  private final String authorityOverride;
162
  private final NameResolverRegistry nameResolverRegistry;
163
  private final URI targetUri;
164
  private final NameResolverProvider nameResolverProvider;
165
  private final NameResolver.Args nameResolverArgs;
166
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
167
  private final ClientTransportFactory originalTransportFactory;
168
  @Nullable
169
  private final ChannelCredentials originalChannelCreds;
170
  private final ClientTransportFactory transportFactory;
171
  private final ClientTransportFactory oobTransportFactory;
172
  private final RestrictedScheduledExecutor scheduledExecutor;
173
  private final Executor executor;
174
  private final ObjectPool<? extends Executor> executorPool;
175
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
176
  private final ExecutorHolder balancerRpcExecutorHolder;
177
  private final ExecutorHolder offloadExecutorHolder;
178
  private final TimeProvider timeProvider;
179
  private final int maxTraceEvents;
180

181
  @VisibleForTesting
1✔
182
  final SynchronizationContext syncContext = new SynchronizationContext(
183
      new Thread.UncaughtExceptionHandler() {
1✔
184
        @Override
185
        public void uncaughtException(Thread t, Throwable e) {
186
          logger.log(
1✔
187
              Level.SEVERE,
188
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
189
              e);
190
          panic(e);
1✔
191
        }
1✔
192
      });
193

194
  private boolean fullStreamDecompression;
195

196
  private final DecompressorRegistry decompressorRegistry;
197
  private final CompressorRegistry compressorRegistry;
198

199
  private final Supplier<Stopwatch> stopwatchSupplier;
200
  /** The timeout before entering idle mode. */
201
  private final long idleTimeoutMillis;
202

203
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
204
  private final BackoffPolicy.Provider backoffPolicyProvider;
205

206
  /**
207
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
208
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
209
   * {@link RealChannel}.
210
   */
211
  private final Channel interceptorChannel;
212

213
  private final List<ClientTransportFilter> transportFilters;
214
  @Nullable private final String userAgent;
215

216
  // Only null after channel is terminated. Must be assigned from the syncContext.
217
  private NameResolver nameResolver;
218

219
  // Must be accessed from the syncContext.
220
  private boolean nameResolverStarted;
221

222
  // null when channel is in idle mode.  Must be assigned from syncContext.
223
  @Nullable
224
  private LbHelperImpl lbHelper;
225

226
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
227
  // null if channel is in idle mode.
228
  @Nullable
229
  private volatile SubchannelPicker subchannelPicker;
230

231
  // Must be accessed from the syncContext
232
  private boolean panicMode;
233

234
  // Must be mutated from syncContext
235
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
236
  // switch to a ConcurrentHashMap.
237
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
238

239
  // Must be accessed from syncContext
240
  @Nullable
241
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
242
  private final Object pendingCallsInUseObject = new Object();
1✔
243

244
  // Must be mutated from syncContext
245
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
246

247
  // reprocess() must be run from syncContext
248
  private final DelayedClientTransport delayedTransport;
249
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
250
      = new UncommittedRetriableStreamsRegistry();
251

252
  // Shutdown states.
253
  //
254
  // Channel's shutdown process:
255
  // 1. shutdown(): stop accepting new calls from applications
256
  //   1a shutdown <- true
257
  //   1b subchannelPicker <- null
258
  //   1c delayedTransport.shutdown()
259
  // 2. delayedTransport terminated: stop stream-creation functionality
260
  //   2a terminating <- true
261
  //   2b loadBalancer.shutdown()
262
  //     * LoadBalancer will shutdown subchannels and OOB channels
263
  //   2c loadBalancer <- null
264
  //   2d nameResolver.shutdown()
265
  //   2e nameResolver <- null
266
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
267

268
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
269
  // Must only be mutated and read from syncContext
270
  private boolean shutdownNowed;
271
  // Must only be mutated from syncContext
272
  private boolean terminating;
273
  // Must be mutated from syncContext
274
  private volatile boolean terminated;
275
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
276

277
  private final CallTracer.Factory callTracerFactory;
278
  private final CallTracer channelCallTracer;
279
  private final ChannelTracer channelTracer;
280
  private final ChannelLogger channelLogger;
281
  private final InternalChannelz channelz;
282
  private final RealChannel realChannel;
283
  // Must be mutated and read from syncContext
284
  // a flag for doing channel tracing when flipped
285
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
286
  // Must be mutated and read from constructor or syncContext
287
  // used for channel tracing when value changed
288
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
289

290
  @Nullable
291
  private final ManagedChannelServiceConfig defaultServiceConfig;
292
  // Must be mutated and read from constructor or syncContext
293
  private boolean serviceConfigUpdated = false;
1✔
294
  private final boolean lookUpServiceConfig;
295

296
  // One instance per channel.
297
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
298

299
  private final long perRpcBufferLimit;
300
  private final long channelBufferLimit;
301

302
  // Temporary false flag that can skip the retry code path.
303
  private final boolean retryEnabled;
304

305
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
306

307
  // Called from syncContext
308
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
309
      new DelayedTransportListener();
310

311
  // Must be called from syncContext
312
  private void maybeShutdownNowSubchannels() {
313
    if (shutdownNowed) {
1✔
314
      for (InternalSubchannel subchannel : subchannels) {
1✔
315
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
316
      }
1✔
317
      for (OobChannel oobChannel : oobChannels) {
1✔
318
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
319
      }
1✔
320
    }
321
  }
1✔
322

323
  // Must be accessed from syncContext
324
  @VisibleForTesting
1✔
325
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
326

327
  @Override
328
  public ListenableFuture<ChannelStats> getStats() {
329
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
330
    final class StatsFetcher implements Runnable {
1✔
331
      @Override
332
      public void run() {
333
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
334
        channelCallTracer.updateBuilder(builder);
1✔
335
        channelTracer.updateBuilder(builder);
1✔
336
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
337
        List<InternalWithLogId> children = new ArrayList<>();
1✔
338
        children.addAll(subchannels);
1✔
339
        children.addAll(oobChannels);
1✔
340
        builder.setSubchannels(children);
1✔
341
        ret.set(builder.build());
1✔
342
      }
1✔
343
    }
344

345
    // subchannels and oobchannels can only be accessed from syncContext
346
    syncContext.execute(new StatsFetcher());
1✔
347
    return ret;
1✔
348
  }
349

350
  @Override
351
  public InternalLogId getLogId() {
352
    return logId;
1✔
353
  }
354

355
  // Run from syncContext
356
  private class IdleModeTimer implements Runnable {
1✔
357

358
    @Override
359
    public void run() {
360
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
361
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
362
      // subtle to change rapidly to resolve the channel panic. See #8714
363
      if (lbHelper == null) {
1✔
364
        return;
×
365
      }
366
      enterIdleMode();
1✔
367
    }
1✔
368
  }
369

370
  // Must be called from syncContext
371
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
372
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
373
    if (channelIsActive) {
1✔
374
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
375
      checkState(lbHelper != null, "lbHelper is null");
1✔
376
    }
377
    if (nameResolver != null) {
1✔
378
      nameResolver.shutdown();
1✔
379
      nameResolverStarted = false;
1✔
380
      if (channelIsActive) {
1✔
381
        nameResolver = getNameResolver(
1✔
382
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
383
      } else {
384
        nameResolver = null;
1✔
385
      }
386
    }
387
    if (lbHelper != null) {
1✔
388
      lbHelper.lb.shutdown();
1✔
389
      lbHelper = null;
1✔
390
    }
391
    subchannelPicker = null;
1✔
392
  }
1✔
393

394
  /**
395
   * Make the channel exit idle mode, if it's in it.
396
   *
397
   * <p>Must be called from syncContext
398
   */
399
  @VisibleForTesting
400
  void exitIdleMode() {
401
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
402
    if (shutdown.get() || panicMode) {
1✔
403
      return;
1✔
404
    }
405
    if (inUseStateAggregator.isInUse()) {
1✔
406
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
407
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
408
      cancelIdleTimer(false);
1✔
409
    } else {
410
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
411
      // isInUse() == false, in which case we still need to schedule the timer.
412
      rescheduleIdleTimer();
1✔
413
    }
414
    if (lbHelper != null) {
1✔
415
      return;
1✔
416
    }
417
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
418
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
419
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
420
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
421
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
422
    this.lbHelper = lbHelper;
1✔
423

424
    channelStateManager.gotoState(CONNECTING);
1✔
425
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
426
    nameResolver.start(listener);
1✔
427
    nameResolverStarted = true;
1✔
428
  }
1✔
429

430
  // Must be run from syncContext
431
  private void enterIdleMode() {
432
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
433
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
434
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
435
    // which are bugs.
436
    shutdownNameResolverAndLoadBalancer(true);
1✔
437
    delayedTransport.reprocess(null);
1✔
438
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
439
    channelStateManager.gotoState(IDLE);
1✔
440
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
441
    // transport to be holding some we need to exit idle mode to give these calls a chance to
442
    // be processed.
443
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
444
      exitIdleMode();
1✔
445
    }
446
  }
1✔
447

448
  // Must be run from syncContext
449
  private void cancelIdleTimer(boolean permanent) {
450
    idleTimer.cancel(permanent);
1✔
451
  }
1✔
452

453
  // Always run from syncContext
454
  private void rescheduleIdleTimer() {
455
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
456
      return;
1✔
457
    }
458
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
459
  }
1✔
460

461
  /**
462
   * Force name resolution refresh to happen immediately. Must be run
463
   * from syncContext.
464
   */
465
  private void refreshNameResolution() {
466
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
467
    if (nameResolverStarted) {
1✔
468
      nameResolver.refresh();
1✔
469
    }
470
  }
1✔
471

472
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
473
    volatile Throttle throttle;
474

475
    @Override
476
    public ClientStream newStream(
477
        final MethodDescriptor<?, ?> method,
478
        final CallOptions callOptions,
479
        final Metadata headers,
480
        final Context context) {
481
      // There is no need to reschedule the idle timer here. If the channel isn't shut down, either
482
      // the delayed transport or a real transport will go in-use and cancel the idle timer.
483
      if (!retryEnabled) {
1✔
484
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
485
            callOptions, headers, 0, /* isTransparentRetry= */ false);
486
        Context origContext = context.attach();
1✔
487
        try {
488
          return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
489
        } finally {
490
          context.detach(origContext);
1✔
491
        }
492
      } else {
493
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
494
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
495
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
496
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
497
          @SuppressWarnings("unchecked")
498
          RetryStream() {
1✔
499
            super(
1✔
500
                (MethodDescriptor<ReqT, ?>) method,
501
                headers,
502
                channelBufferUsed,
1✔
503
                perRpcBufferLimit,
1✔
504
                channelBufferLimit,
1✔
505
                getCallExecutor(callOptions),
1✔
506
                transportFactory.getScheduledExecutorService(),
1✔
507
                retryPolicy,
508
                hedgingPolicy,
509
                throttle);
510
          }
1✔
511

512
          @Override
513
          Status prestart() {
514
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
515
          }
516

517
          @Override
518
          void postCommit() {
519
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
520
          }
1✔
521

522
          @Override
523
          ClientStream newSubstream(
524
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
525
              boolean isTransparentRetry) {
526
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
527
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
528
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
529
            Context origContext = context.attach();
1✔
530
            try {
531
              return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
1✔
532
            } finally {
533
              context.detach(origContext);
1✔
534
            }
535
          }
536
        }
537

538
        return new RetryStream<>();
1✔
539
      }
540
    }
541
  }
542

543
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
544

545
  private final Rescheduler idleTimer;
546
  private final MetricRecorder metricRecorder;
547

548
  ManagedChannelImpl(
549
      ManagedChannelImplBuilder builder,
550
      ClientTransportFactory clientTransportFactory,
551
      URI targetUri,
552
      NameResolverProvider nameResolverProvider,
553
      BackoffPolicy.Provider backoffPolicyProvider,
554
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
555
      Supplier<Stopwatch> stopwatchSupplier,
556
      List<ClientInterceptor> interceptors,
557
      final TimeProvider timeProvider) {
1✔
558
    this.target = checkNotNull(builder.target, "target");
1✔
559
    this.logId = InternalLogId.allocate("Channel", target);
1✔
560
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
561
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
562
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
563
    this.originalChannelCreds = builder.channelCredentials;
1✔
564
    this.originalTransportFactory = clientTransportFactory;
1✔
565
    this.offloadExecutorHolder =
1✔
566
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
567
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
568
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
569
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
570
        clientTransportFactory, null, this.offloadExecutorHolder);
571
    this.scheduledExecutor =
1✔
572
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
573
    maxTraceEvents = builder.maxTraceEvents;
1✔
574
    channelTracer = new ChannelTracer(
1✔
575
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
576
        "Channel for '" + target + "'");
577
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
578
    ProxyDetector proxyDetector =
579
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
580
    this.retryEnabled = builder.retryEnabled;
1✔
581
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
582
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
583
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
584
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
585
    ScParser serviceConfigParser =
1✔
586
        new ScParser(
587
            retryEnabled,
588
            builder.maxRetryAttempts,
589
            builder.maxHedgedAttempts,
590
            loadBalancerFactory);
591
    this.authorityOverride = builder.authorityOverride;
1✔
592
    this.nameResolverArgs =
1✔
593
        NameResolver.Args.newBuilder()
1✔
594
            .setDefaultPort(builder.getDefaultPort())
1✔
595
            .setProxyDetector(proxyDetector)
1✔
596
            .setSynchronizationContext(syncContext)
1✔
597
            .setScheduledExecutorService(scheduledExecutor)
1✔
598
            .setServiceConfigParser(serviceConfigParser)
1✔
599
            .setChannelLogger(channelLogger)
1✔
600
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
601
            .setOverrideAuthority(this.authorityOverride)
1✔
602
            .build();
1✔
603
    this.nameResolver = getNameResolver(
1✔
604
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
605
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
606
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
607
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
608
    this.delayedTransport.start(delayedTransportListener);
1✔
609
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
610

611
    if (builder.defaultServiceConfig != null) {
1✔
612
      ConfigOrError parsedDefaultServiceConfig =
1✔
613
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
614
      checkState(
1✔
615
          parsedDefaultServiceConfig.getError() == null,
1✔
616
          "Default config is invalid: %s",
617
          parsedDefaultServiceConfig.getError());
1✔
618
      this.defaultServiceConfig =
1✔
619
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
620
      this.transportProvider.throttle = this.defaultServiceConfig.getRetryThrottling();
1✔
621
    } else {
1✔
622
      this.defaultServiceConfig = null;
1✔
623
    }
624
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
625
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
626
    Channel channel = realChannel;
1✔
627
    if (builder.binlog != null) {
1✔
628
      channel = builder.binlog.wrapChannel(channel);
1✔
629
    }
630
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
631
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
632
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
633
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
634
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
635
    } else {
636
      checkArgument(
1✔
637
          builder.idleTimeoutMillis
638
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
639
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
640
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
641
    }
642

643
    idleTimer = new Rescheduler(
1✔
644
        new IdleModeTimer(),
645
        syncContext,
646
        transportFactory.getScheduledExecutorService(),
1✔
647
        stopwatchSupplier.get());
1✔
648
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
649
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
650
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
651
    this.userAgent = builder.userAgent;
1✔
652

653
    this.channelBufferLimit = builder.retryBufferSize;
1✔
654
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
655
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
656
      @Override
657
      public CallTracer create() {
658
        return new CallTracer(timeProvider);
1✔
659
      }
660
    }
661

662
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
663
    channelCallTracer = callTracerFactory.create();
1✔
664
    this.channelz = checkNotNull(builder.channelz);
1✔
665
    channelz.addRootChannel(this);
1✔
666

667
    if (!lookUpServiceConfig) {
1✔
668
      if (defaultServiceConfig != null) {
1✔
669
        channelLogger.log(
1✔
670
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
671
      }
672
      serviceConfigUpdated = true;
1✔
673
    }
674
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
675
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
676
  }
1✔
677

678
  @VisibleForTesting
679
  static NameResolver getNameResolver(
680
      URI targetUri, @Nullable final String overrideAuthority,
681
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
682
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
683
    if (resolver == null) {
1✔
684
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
685
    }
686

687
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
688
    // TODO: After a transition period, all NameResolver implementations that need retry should use
689
    //       RetryingNameResolver directly and this step can be removed.
690
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
691
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
692
              nameResolverArgs.getScheduledExecutorService(),
1✔
693
              nameResolverArgs.getSynchronizationContext()),
1✔
694
          nameResolverArgs.getSynchronizationContext());
1✔
695

696
    if (overrideAuthority == null) {
1✔
697
      return usedNameResolver;
1✔
698
    }
699

700
    return new ForwardingNameResolver(usedNameResolver) {
1✔
701
      @Override
702
      public String getServiceAuthority() {
703
        return overrideAuthority;
1✔
704
      }
705
    };
706
  }
707

708
  @VisibleForTesting
709
  InternalConfigSelector getConfigSelector() {
710
    return realChannel.configSelector.get();
1✔
711
  }
712
  
713
  @VisibleForTesting
714
  boolean hasThrottle() {
715
    return this.transportProvider.throttle != null;
1✔
716
  }
717

718
  /**
719
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
720
   * cancelled.
721
   */
722
  @Override
723
  public ManagedChannelImpl shutdown() {
724
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
725
    if (!shutdown.compareAndSet(false, true)) {
1✔
726
      return this;
1✔
727
    }
728
    final class Shutdown implements Runnable {
1✔
729
      @Override
730
      public void run() {
731
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
732
        channelStateManager.gotoState(SHUTDOWN);
1✔
733
      }
1✔
734
    }
735

736
    syncContext.execute(new Shutdown());
1✔
737
    realChannel.shutdown();
1✔
738
    final class CancelIdleTimer implements Runnable {
1✔
739
      @Override
740
      public void run() {
741
        cancelIdleTimer(/* permanent= */ true);
1✔
742
      }
1✔
743
    }
744

745
    syncContext.execute(new CancelIdleTimer());
1✔
746
    return this;
1✔
747
  }
748

749
  /**
750
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
751
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
752
   * return {@code false} immediately after this method returns.
753
   */
754
  @Override
755
  public ManagedChannelImpl shutdownNow() {
756
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
757
    shutdown();
1✔
758
    realChannel.shutdownNow();
1✔
759
    final class ShutdownNow implements Runnable {
1✔
760
      @Override
761
      public void run() {
762
        if (shutdownNowed) {
1✔
763
          return;
1✔
764
        }
765
        shutdownNowed = true;
1✔
766
        maybeShutdownNowSubchannels();
1✔
767
      }
1✔
768
    }
769

770
    syncContext.execute(new ShutdownNow());
1✔
771
    return this;
1✔
772
  }
773

774
  // Called from syncContext
775
  @VisibleForTesting
776
  void panic(final Throwable t) {
777
    if (panicMode) {
1✔
778
      // Preserve the first panic information
779
      return;
×
780
    }
781
    panicMode = true;
1✔
782
    cancelIdleTimer(/* permanent= */ true);
1✔
783
    shutdownNameResolverAndLoadBalancer(false);
1✔
784
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
785
      private final PickResult panicPickResult =
1✔
786
          PickResult.withDrop(
1✔
787
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
788

789
      @Override
790
      public PickResult pickSubchannel(PickSubchannelArgs args) {
791
        return panicPickResult;
1✔
792
      }
793

794
      @Override
795
      public String toString() {
796
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
797
            .add("panicPickResult", panicPickResult)
×
798
            .toString();
×
799
      }
800
    }
801

802
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
803
    realChannel.updateConfigSelector(null);
1✔
804
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
805
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
806
  }
1✔
807

808
  @VisibleForTesting
809
  boolean isInPanicMode() {
810
    return panicMode;
1✔
811
  }
812

813
  // Called from syncContext
814
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
815
    subchannelPicker = newPicker;
1✔
816
    delayedTransport.reprocess(newPicker);
1✔
817
  }
1✔
818

819
  @Override
820
  public boolean isShutdown() {
821
    return shutdown.get();
1✔
822
  }
823

824
  @Override
825
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
826
    return terminatedLatch.await(timeout, unit);
1✔
827
  }
828

829
  @Override
830
  public boolean isTerminated() {
831
    return terminated;
1✔
832
  }
833

834
  /*
835
   * Creates a new outgoing call on the channel.
836
   */
837
  @Override
838
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
839
      CallOptions callOptions) {
840
    return interceptorChannel.newCall(method, callOptions);
1✔
841
  }
842

843
  @Override
844
  public String authority() {
845
    return interceptorChannel.authority();
1✔
846
  }
847

848
  private Executor getCallExecutor(CallOptions callOptions) {
849
    Executor executor = callOptions.getExecutor();
1✔
850
    if (executor == null) {
1✔
851
      executor = this.executor;
1✔
852
    }
853
    return executor;
1✔
854
  }
855

856
  private class RealChannel extends Channel {
857
    // Reference to null if no config selector is available from resolution result
858
    // Reference must be set() from syncContext
859
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
860
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
861
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
862
    // same target, the new instance must have the same value.
863
    private final String authority;
864

865
    private final Channel clientCallImplChannel = new Channel() {
1✔
866
      @Override
867
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
868
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
869
        return new ClientCallImpl<>(
1✔
870
            method,
871
            getCallExecutor(callOptions),
1✔
872
            callOptions,
873
            transportProvider,
1✔
874
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
875
            channelCallTracer,
1✔
876
            null)
877
            .setFullStreamDecompression(fullStreamDecompression)
1✔
878
            .setDecompressorRegistry(decompressorRegistry)
1✔
879
            .setCompressorRegistry(compressorRegistry);
1✔
880
      }
881

882
      @Override
883
      public String authority() {
884
        return authority;
×
885
      }
886
    };
887

888
    private RealChannel(String authority) {
1✔
889
      this.authority =  checkNotNull(authority, "authority");
1✔
890
    }
1✔
891

892
    @Override
893
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
894
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
895
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
896
        return newClientCall(method, callOptions);
1✔
897
      }
898
      syncContext.execute(new Runnable() {
1✔
899
        @Override
900
        public void run() {
901
          exitIdleMode();
1✔
902
        }
1✔
903
      });
904
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
905
        // This is an optimization for the case (typically with InProcessTransport) when name
906
        // resolution result is immediately available at this point. Otherwise, some users'
907
        // tests might observe slight behavior difference from earlier grpc versions.
908
        return newClientCall(method, callOptions);
1✔
909
      }
910
      if (shutdown.get()) {
1✔
911
        // Return a failing ClientCall.
912
        return new ClientCall<ReqT, RespT>() {
×
913
          @Override
914
          public void start(Listener<RespT> responseListener, Metadata headers) {
915
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
916
          }
×
917

918
          @Override public void request(int numMessages) {}
×
919

920
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
921

922
          @Override public void halfClose() {}
×
923

924
          @Override public void sendMessage(ReqT message) {}
×
925
        };
926
      }
927
      Context context = Context.current();
1✔
928
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
929
      syncContext.execute(new Runnable() {
1✔
930
        @Override
931
        public void run() {
932
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
933
            if (pendingCalls == null) {
1✔
934
              pendingCalls = new LinkedHashSet<>();
1✔
935
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
936
            }
937
            pendingCalls.add(pendingCall);
1✔
938
          } else {
939
            pendingCall.reprocess();
1✔
940
          }
941
        }
1✔
942
      });
943
      return pendingCall;
1✔
944
    }
945

946
    // Must run in SynchronizationContext.
947
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
948
      InternalConfigSelector prevConfig = configSelector.get();
1✔
949
      configSelector.set(config);
1✔
950
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
951
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
952
          pendingCall.reprocess();
1✔
953
        }
1✔
954
      }
955
    }
1✔
956

957
    // Must run in SynchronizationContext.
958
    void onConfigError() {
959
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
960
        // Apply Default Service Config if initial name resolution fails.
961
        if (defaultServiceConfig != null) {
1✔
962
          updateConfigSelector(defaultServiceConfig.getDefaultConfigSelector());
1✔
963
          lastServiceConfig = defaultServiceConfig;
1✔
964
          channelLogger.log(ChannelLogLevel.ERROR,
1✔
965
              "Initial Name Resolution error, using default service config");
966
        } else {
967
          updateConfigSelector(null);
1✔
968
        }
969
      }
970
    }
1✔
971

972
    void shutdown() {
973
      final class RealChannelShutdown implements Runnable {
1✔
974
        @Override
975
        public void run() {
976
          if (pendingCalls == null) {
1✔
977
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
978
              configSelector.set(null);
1✔
979
            }
980
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
981
          }
982
        }
1✔
983
      }
984

985
      syncContext.execute(new RealChannelShutdown());
1✔
986
    }
1✔
987

988
    void shutdownNow() {
989
      final class RealChannelShutdownNow implements Runnable {
1✔
990
        @Override
991
        public void run() {
992
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
993
            configSelector.set(null);
1✔
994
          }
995
          if (pendingCalls != null) {
1✔
996
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
997
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
998
            }
1✔
999
          }
1000
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
1001
        }
1✔
1002
      }
1003

1004
      syncContext.execute(new RealChannelShutdownNow());
1✔
1005
    }
1✔
1006

1007
    @Override
1008
    public String authority() {
1009
      return authority;
1✔
1010
    }
1011

1012
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1013
      final Context context;
1014
      final MethodDescriptor<ReqT, RespT> method;
1015
      final CallOptions callOptions;
1016
      private final long callCreationTime;
1017

1018
      PendingCall(
1019
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1020
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1021
        this.context = context;
1✔
1022
        this.method = method;
1✔
1023
        this.callOptions = callOptions;
1✔
1024
        this.callCreationTime = ticker.nanoTime();
1✔
1025
      }
1✔
1026

1027
      /** Called when it's ready to create a real call and reprocess the pending call. */
1028
      void reprocess() {
1029
        ClientCall<ReqT, RespT> realCall;
1030
        Context previous = context.attach();
1✔
1031
        try {
1032
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1033
              ticker.nanoTime() - callCreationTime);
1✔
1034
          realCall = newClientCall(method, delayResolutionOption);
1✔
1035
        } finally {
1036
          context.detach(previous);
1✔
1037
        }
1038
        Runnable toRun = setCall(realCall);
1✔
1039
        if (toRun == null) {
1✔
1040
          syncContext.execute(new PendingCallRemoval());
1✔
1041
        } else {
1042
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1043
            @Override
1044
            public void run() {
1045
              toRun.run();
1✔
1046
              syncContext.execute(new PendingCallRemoval());
1✔
1047
            }
1✔
1048
          });
1049
        }
1050
      }
1✔
1051

1052
      @Override
1053
      protected void callCancelled() {
1054
        super.callCancelled();
1✔
1055
        syncContext.execute(new PendingCallRemoval());
1✔
1056
      }
1✔
1057

1058
      final class PendingCallRemoval implements Runnable {
1✔
1059
        @Override
1060
        public void run() {
1061
          if (pendingCalls != null) {
1✔
1062
            pendingCalls.remove(PendingCall.this);
1✔
1063
            if (pendingCalls.isEmpty()) {
1✔
1064
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1065
              pendingCalls = null;
1✔
1066
              if (shutdown.get()) {
1✔
1067
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1068
              }
1069
            }
1070
          }
1071
        }
1✔
1072
      }
1073
    }
1074

1075
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1076
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1077
      InternalConfigSelector selector = configSelector.get();
1✔
1078
      if (selector == null) {
1✔
1079
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1080
      }
1081
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1082
        MethodInfo methodInfo =
1✔
1083
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1084
        if (methodInfo != null) {
1✔
1085
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1086
        }
1087
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1088
      }
1089
      return new ConfigSelectingClientCall<>(
1✔
1090
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1091
    }
1092
  }
1093

1094
  /**
1095
   * A client call for a given channel that applies a given config selector when it starts.
1096
   */
1097
  static final class ConfigSelectingClientCall<ReqT, RespT>
1098
      extends ForwardingClientCall<ReqT, RespT> {
1099

1100
    private final InternalConfigSelector configSelector;
1101
    private final Channel channel;
1102
    private final Executor callExecutor;
1103
    private final MethodDescriptor<ReqT, RespT> method;
1104
    private final Context context;
1105
    private CallOptions callOptions;
1106

1107
    private ClientCall<ReqT, RespT> delegate;
1108

1109
    ConfigSelectingClientCall(
1110
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1111
        MethodDescriptor<ReqT, RespT> method,
1112
        CallOptions callOptions) {
1✔
1113
      this.configSelector = configSelector;
1✔
1114
      this.channel = channel;
1✔
1115
      this.method = method;
1✔
1116
      this.callExecutor =
1✔
1117
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1118
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1119
      this.context = Context.current();
1✔
1120
    }
1✔
1121

1122
    @Override
1123
    protected ClientCall<ReqT, RespT> delegate() {
1124
      return delegate;
1✔
1125
    }
1126

1127
    @SuppressWarnings("unchecked")
1128
    @Override
1129
    public void start(Listener<RespT> observer, Metadata headers) {
1130
      PickSubchannelArgs args =
1✔
1131
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1132
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1133
      Status status = result.getStatus();
1✔
1134
      if (!status.isOk()) {
1✔
1135
        executeCloseObserverInContext(observer,
1✔
1136
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1137
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1138
        return;
1✔
1139
      }
1140
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1141
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1142
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1143
      if (methodInfo != null) {
1✔
1144
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1145
      }
1146
      if (interceptor != null) {
1✔
1147
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1148
      } else {
1149
        delegate = channel.newCall(method, callOptions);
×
1150
      }
1151
      delegate.start(observer, headers);
1✔
1152
    }
1✔
1153

1154
    private void executeCloseObserverInContext(
1155
        final Listener<RespT> observer, final Status status) {
1156
      class CloseInContext extends ContextRunnable {
1157
        CloseInContext() {
1✔
1158
          super(context);
1✔
1159
        }
1✔
1160

1161
        @Override
1162
        public void runInContext() {
1163
          observer.onClose(status, new Metadata());
1✔
1164
        }
1✔
1165
      }
1166

1167
      callExecutor.execute(new CloseInContext());
1✔
1168
    }
1✔
1169

1170
    @Override
1171
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1172
      if (delegate != null) {
×
1173
        delegate.cancel(message, cause);
×
1174
      }
1175
    }
×
1176
  }
1177

1178
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1179
    @Override
1180
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1181

1182
    @Override
1183
    public void request(int numMessages) {}
1✔
1184

1185
    @Override
1186
    public void cancel(String message, Throwable cause) {}
×
1187

1188
    @Override
1189
    public void halfClose() {}
×
1190

1191
    @Override
1192
    public void sendMessage(Object message) {}
×
1193

1194
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1195
    @Override
1196
    public boolean isReady() {
1197
      return false;
×
1198
    }
1199
  };
1200

1201
  /**
1202
   * Terminate the channel if termination conditions are met.
1203
   */
1204
  // Must be run from syncContext
1205
  private void maybeTerminateChannel() {
1206
    if (terminated) {
1✔
1207
      return;
×
1208
    }
1209
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1210
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1211
      channelz.removeRootChannel(this);
1✔
1212
      executorPool.returnObject(executor);
1✔
1213
      balancerRpcExecutorHolder.release();
1✔
1214
      offloadExecutorHolder.release();
1✔
1215
      // Release the transport factory so that it can deallocate any resources.
1216
      transportFactory.close();
1✔
1217

1218
      terminated = true;
1✔
1219
      terminatedLatch.countDown();
1✔
1220
    }
1221
  }
1✔
1222

1223
  // Must be called from syncContext
1224
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1225
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1226
      refreshNameResolution();
1✔
1227
    }
1228
  }
1✔
1229

1230
  @Override
1231
  @SuppressWarnings("deprecation")
1232
  public ConnectivityState getState(boolean requestConnection) {
1233
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1234
    if (requestConnection && savedChannelState == IDLE) {
1✔
1235
      final class RequestConnection implements Runnable {
1✔
1236
        @Override
1237
        public void run() {
1238
          exitIdleMode();
1✔
1239
          if (subchannelPicker != null) {
1✔
1240
            subchannelPicker.requestConnection();
1✔
1241
          }
1242
          if (lbHelper != null) {
1✔
1243
            lbHelper.lb.requestConnection();
1✔
1244
          }
1245
        }
1✔
1246
      }
1247

1248
      syncContext.execute(new RequestConnection());
1✔
1249
    }
1250
    return savedChannelState;
1✔
1251
  }
1252

1253
  @Override
1254
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1255
    final class NotifyStateChanged implements Runnable {
1✔
1256
      @Override
1257
      public void run() {
1258
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1259
      }
1✔
1260
    }
1261

1262
    syncContext.execute(new NotifyStateChanged());
1✔
1263
  }
1✔
1264

1265
  @Override
1266
  public void resetConnectBackoff() {
1267
    final class ResetConnectBackoff implements Runnable {
1✔
1268
      @Override
1269
      public void run() {
1270
        if (shutdown.get()) {
1✔
1271
          return;
1✔
1272
        }
1273
        if (nameResolverStarted) {
1✔
1274
          refreshNameResolution();
1✔
1275
        }
1276
        for (InternalSubchannel subchannel : subchannels) {
1✔
1277
          subchannel.resetConnectBackoff();
1✔
1278
        }
1✔
1279
        for (OobChannel oobChannel : oobChannels) {
1✔
1280
          oobChannel.resetConnectBackoff();
×
1281
        }
×
1282
      }
1✔
1283
    }
1284

1285
    syncContext.execute(new ResetConnectBackoff());
1✔
1286
  }
1✔
1287

1288
  @Override
1289
  public void enterIdle() {
1290
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1291
      @Override
1292
      public void run() {
1293
        if (shutdown.get() || lbHelper == null) {
1✔
1294
          return;
1✔
1295
        }
1296
        cancelIdleTimer(/* permanent= */ false);
1✔
1297
        enterIdleMode();
1✔
1298
      }
1✔
1299
    }
1300

1301
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1302
  }
1✔
1303

1304
  /**
1305
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1306
   * backoff.
1307
   */
1308
  private final class UncommittedRetriableStreamsRegistry {
1✔
1309
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1310
    // it's worthwhile to look for a lock-free approach.
1311
    final Object lock = new Object();
1✔
1312

1313
    @GuardedBy("lock")
1✔
1314
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1315

1316
    @GuardedBy("lock")
1317
    Status shutdownStatus;
1318

1319
    void onShutdown(Status reason) {
1320
      boolean shouldShutdownDelayedTransport = false;
1✔
1321
      synchronized (lock) {
1✔
1322
        if (shutdownStatus != null) {
1✔
1323
          return;
1✔
1324
        }
1325
        shutdownStatus = reason;
1✔
1326
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1327
        // retriable streams, which may be in backoff and not using any transport, are already
1328
        // started RPCs.
1329
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1330
          shouldShutdownDelayedTransport = true;
1✔
1331
        }
1332
      }
1✔
1333

1334
      if (shouldShutdownDelayedTransport) {
1✔
1335
        delayedTransport.shutdown(reason);
1✔
1336
      }
1337
    }
1✔
1338

1339
    void onShutdownNow(Status reason) {
1340
      onShutdown(reason);
1✔
1341
      Collection<ClientStream> streams;
1342

1343
      synchronized (lock) {
1✔
1344
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1345
      }
1✔
1346

1347
      for (ClientStream stream : streams) {
1✔
1348
        stream.cancel(reason);
1✔
1349
      }
1✔
1350
      delayedTransport.shutdownNow(reason);
1✔
1351
    }
1✔
1352

1353
    /**
1354
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1355
     * shutdown Status.
1356
     */
1357
    @Nullable
1358
    Status add(RetriableStream<?> retriableStream) {
1359
      synchronized (lock) {
1✔
1360
        if (shutdownStatus != null) {
1✔
1361
          return shutdownStatus;
1✔
1362
        }
1363
        uncommittedRetriableStreams.add(retriableStream);
1✔
1364
        return null;
1✔
1365
      }
1366
    }
1367

1368
    void remove(RetriableStream<?> retriableStream) {
1369
      Status shutdownStatusCopy = null;
1✔
1370

1371
      synchronized (lock) {
1✔
1372
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1373
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1374
          shutdownStatusCopy = shutdownStatus;
1✔
1375
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1376
          // hashmap.
1377
          uncommittedRetriableStreams = new HashSet<>();
1✔
1378
        }
1379
      }
1✔
1380

1381
      if (shutdownStatusCopy != null) {
1✔
1382
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1383
      }
1384
    }
1✔
1385
  }
1386

1387
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1388
    AutoConfiguredLoadBalancer lb;
1389

1390
    @Override
1391
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1392
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1393
      // No new subchannel should be created after load balancer has been shutdown.
1394
      checkState(!terminating, "Channel is being terminated");
1✔
1395
      return new SubchannelImpl(args);
1✔
1396
    }
1397

1398
    @Override
1399
    public void updateBalancingState(
1400
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1401
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1402
      checkNotNull(newState, "newState");
1✔
1403
      checkNotNull(newPicker, "newPicker");
1✔
1404
      final class UpdateBalancingState implements Runnable {
1✔
1405
        @Override
1406
        public void run() {
1407
          if (LbHelperImpl.this != lbHelper) {
1✔
1408
            return;
1✔
1409
          }
1410
          updateSubchannelPicker(newPicker);
1✔
1411
          // It's not appropriate to report SHUTDOWN state from lb.
1412
          // Ignore the case of newState == SHUTDOWN for now.
1413
          if (newState != SHUTDOWN) {
1✔
1414
            channelLogger.log(
1✔
1415
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1416
            channelStateManager.gotoState(newState);
1✔
1417
          }
1418
        }
1✔
1419
      }
1420

1421
      syncContext.execute(new UpdateBalancingState());
1✔
1422
    }
1✔
1423

1424
    @Override
1425
    public void refreshNameResolution() {
1426
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1427
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1428
        @Override
1429
        public void run() {
1430
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1431
        }
1✔
1432
      }
1433

1434
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1435
    }
1✔
1436

1437
    @Override
1438
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1439
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1440
    }
1441

1442
    @Override
1443
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1444
        String authority) {
1445
      // TODO(ejona): can we be even stricter? Like terminating?
1446
      checkState(!terminated, "Channel is terminated");
1✔
1447
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1448
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1449
      InternalLogId subchannelLogId =
1✔
1450
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1451
      ChannelTracer oobChannelTracer =
1✔
1452
          new ChannelTracer(
1453
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1454
              "OobChannel for " + addressGroup);
1455
      final OobChannel oobChannel = new OobChannel(
1✔
1456
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1457
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1458
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1459
          .setDescription("Child OobChannel created")
1✔
1460
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1461
          .setTimestampNanos(oobChannelCreationTime)
1✔
1462
          .setChannelRef(oobChannel)
1✔
1463
          .build());
1✔
1464
      ChannelTracer subchannelTracer =
1✔
1465
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1466
              "Subchannel for " + addressGroup);
1467
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1468
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1469
        @Override
1470
        void onTerminated(InternalSubchannel is) {
1471
          oobChannels.remove(oobChannel);
1✔
1472
          channelz.removeSubchannel(is);
1✔
1473
          oobChannel.handleSubchannelTerminated();
1✔
1474
          maybeTerminateChannel();
1✔
1475
        }
1✔
1476

1477
        @Override
1478
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1479
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1480
          //  state and refresh name resolution if necessary.
1481
          handleInternalSubchannelState(newState);
1✔
1482
          oobChannel.handleSubchannelStateChange(newState);
1✔
1483
        }
1✔
1484
      }
1485

1486
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1487
          CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
1✔
1488
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1489
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1490
          // All callback methods are run from syncContext
1491
          new ManagedOobChannelCallback(),
1492
          channelz,
1✔
1493
          callTracerFactory.create(),
1✔
1494
          subchannelTracer,
1495
          subchannelLogId,
1496
          subchannelLogger,
1497
          transportFilters);
1✔
1498
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1499
          .setDescription("Child Subchannel created")
1✔
1500
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1501
          .setTimestampNanos(oobChannelCreationTime)
1✔
1502
          .setSubchannelRef(internalSubchannel)
1✔
1503
          .build());
1✔
1504
      channelz.addSubchannel(oobChannel);
1✔
1505
      channelz.addSubchannel(internalSubchannel);
1✔
1506
      oobChannel.setSubchannel(internalSubchannel);
1✔
1507
      final class AddOobChannel implements Runnable {
1✔
1508
        @Override
1509
        public void run() {
1510
          if (terminating) {
1✔
1511
            oobChannel.shutdown();
×
1512
          }
1513
          if (!terminated) {
1✔
1514
            // If channel has not terminated, it will track the subchannel and block termination
1515
            // for it.
1516
            oobChannels.add(oobChannel);
1✔
1517
          }
1518
        }
1✔
1519
      }
1520

1521
      syncContext.execute(new AddOobChannel());
1✔
1522
      return oobChannel;
1✔
1523
    }
1524

1525
    @Deprecated
1526
    @Override
1527
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1528
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1529
          // Override authority to keep the old behavior.
1530
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1531
          .overrideAuthority(getAuthority());
1✔
1532
    }
1533

1534
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1535
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1536
    @Override
1537
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1538
        final String target, final ChannelCredentials channelCreds) {
1539
      checkNotNull(channelCreds, "channelCreds");
1✔
1540

1541
      final class ResolvingOobChannelBuilder
1542
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1543
        final ManagedChannelBuilder<?> delegate;
1544

1545
        ResolvingOobChannelBuilder() {
1✔
1546
          final ClientTransportFactory transportFactory;
1547
          CallCredentials callCredentials;
1548
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1549
            transportFactory = originalTransportFactory;
1✔
1550
            callCredentials = null;
1✔
1551
          } else {
1552
            SwapChannelCredentialsResult swapResult =
1✔
1553
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1554
            if (swapResult == null) {
1✔
1555
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1556
              return;
×
1557
            } else {
1558
              transportFactory = swapResult.transportFactory;
1✔
1559
              callCredentials = swapResult.callCredentials;
1✔
1560
            }
1561
          }
1562
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1563
              new ClientTransportFactoryBuilder() {
1✔
1564
                @Override
1565
                public ClientTransportFactory buildClientTransportFactory() {
1566
                  return transportFactory;
1✔
1567
                }
1568
              };
1569
          delegate = new ManagedChannelImplBuilder(
1✔
1570
              target,
1571
              channelCreds,
1572
              callCredentials,
1573
              transportFactoryBuilder,
1574
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1575
              .nameResolverRegistry(nameResolverRegistry);
1✔
1576
        }
1✔
1577

1578
        @Override
1579
        protected ManagedChannelBuilder<?> delegate() {
1580
          return delegate;
1✔
1581
        }
1582
      }
1583

1584
      checkState(!terminated, "Channel is terminated");
1✔
1585

1586
      @SuppressWarnings("deprecation")
1587
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1588

1589
      return builder
1✔
1590
          // TODO(zdapeng): executors should not outlive the parent channel.
1591
          .executor(executor)
1✔
1592
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1593
          .maxTraceEvents(maxTraceEvents)
1✔
1594
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1595
          .userAgent(userAgent);
1✔
1596
    }
1597

1598
    @Override
1599
    public ChannelCredentials getUnsafeChannelCredentials() {
1600
      if (originalChannelCreds == null) {
1✔
1601
        return new DefaultChannelCreds();
1✔
1602
      }
1603
      return originalChannelCreds;
×
1604
    }
1605

1606
    @Override
1607
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1608
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1609
    }
×
1610

1611
    @Override
1612
    public void updateOobChannelAddresses(ManagedChannel channel,
1613
        List<EquivalentAddressGroup> eag) {
1614
      checkArgument(channel instanceof OobChannel,
1✔
1615
          "channel must have been returned from createOobChannel");
1616
      ((OobChannel) channel).updateAddresses(eag);
1✔
1617
    }
1✔
1618

1619
    @Override
1620
    public String getAuthority() {
1621
      return ManagedChannelImpl.this.authority();
1✔
1622
    }
1623

1624
    @Override
1625
    public String getChannelTarget() {
1626
      return targetUri.toString();
1✔
1627
    }
1628

1629
    @Override
1630
    public SynchronizationContext getSynchronizationContext() {
1631
      return syncContext;
1✔
1632
    }
1633

1634
    @Override
1635
    public ScheduledExecutorService getScheduledExecutorService() {
1636
      return scheduledExecutor;
1✔
1637
    }
1638

1639
    @Override
1640
    public ChannelLogger getChannelLogger() {
1641
      return channelLogger;
1✔
1642
    }
1643

1644
    @Override
1645
    public NameResolver.Args getNameResolverArgs() {
1646
      return nameResolverArgs;
1✔
1647
    }
1648

1649
    @Override
1650
    public NameResolverRegistry getNameResolverRegistry() {
1651
      return nameResolverRegistry;
1✔
1652
    }
1653

1654
    @Override
1655
    public MetricRecorder getMetricRecorder() {
1656
      return metricRecorder;
1✔
1657
    }
1658

1659
    /**
1660
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1661
     */
1662
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1663
    //     channel creds.
1664
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1665
      @Override
1666
      public ChannelCredentials withoutBearerTokens() {
1667
        return this;
×
1668
      }
1669
    }
1670
  }
1671

1672
  final class NameResolverListener extends NameResolver.Listener2 {
1673
    final LbHelperImpl helper;
1674
    final NameResolver resolver;
1675

1676
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1677
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1678
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1679
    }
1✔
1680

1681
    @Override
1682
    public void onResult(final ResolutionResult resolutionResult) {
1683
      final class NamesResolved implements Runnable {
1✔
1684

1685
        @Override
1686
        public void run() {
1687
          Status status = onResult2(resolutionResult);
1✔
1688
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1689
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1690
          resolutionResultListener.resolutionAttempted(status);
1✔
1691
        }
1✔
1692
      }
1693

1694
      syncContext.execute(new NamesResolved());
1✔
1695
    }
1✔
1696

1697
    @SuppressWarnings("ReferenceEquality")
1698
    @Override
1699
    public Status onResult2(final ResolutionResult resolutionResult) {
1700
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1701
      if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1702
        return Status.OK;
1✔
1703
      }
1704

1705
      StatusOr<List<EquivalentAddressGroup>> serversOrError =
1✔
1706
          resolutionResult.getAddressesOrError();
1✔
1707
      if (!serversOrError.hasValue()) {
1✔
1708
        handleErrorInSyncContext(serversOrError.getStatus());
1✔
1709
        return serversOrError.getStatus();
1✔
1710
      }
1711
      List<EquivalentAddressGroup> servers = serversOrError.getValue();
1✔
1712
      channelLogger.log(
1✔
1713
          ChannelLogLevel.DEBUG,
1714
          "Resolved address: {0}, config={1}",
1715
          servers,
1716
          resolutionResult.getAttributes());
1✔
1717

1718
      if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1719
        channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}",
1✔
1720
            servers);
1721
        lastResolutionState = ResolutionState.SUCCESS;
1✔
1722
      }
1723
      ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1724
      InternalConfigSelector resolvedConfigSelector =
1✔
1725
          resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1726
      ManagedChannelServiceConfig validServiceConfig =
1727
          configOrError != null && configOrError.getConfig() != null
1✔
1728
              ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1729
              : null;
1✔
1730
      Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1731

1732
      ManagedChannelServiceConfig effectiveServiceConfig;
1733
      if (!lookUpServiceConfig) {
1✔
1734
        if (validServiceConfig != null) {
1✔
1735
          channelLogger.log(
1✔
1736
              ChannelLogLevel.INFO,
1737
              "Service config from name resolver discarded by channel settings");
1738
        }
1739
        effectiveServiceConfig =
1740
            defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1741
        if (resolvedConfigSelector != null) {
1✔
1742
          channelLogger.log(
1✔
1743
              ChannelLogLevel.INFO,
1744
              "Config selector from name resolver discarded by channel settings");
1745
        }
1746
        realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1747
      } else {
1748
        // Try to use config if returned from name resolver
1749
        // Otherwise, try to use the default config if available
1750
        if (validServiceConfig != null) {
1✔
1751
          effectiveServiceConfig = validServiceConfig;
1✔
1752
          if (resolvedConfigSelector != null) {
1✔
1753
            realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1754
            if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1755
              channelLogger.log(
×
1756
                  ChannelLogLevel.DEBUG,
1757
                  "Method configs in service config will be discarded due to presence of"
1758
                      + "config-selector");
1759
            }
1760
          } else {
1761
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1762
          }
1763
        } else if (defaultServiceConfig != null) {
1✔
1764
          effectiveServiceConfig = defaultServiceConfig;
1✔
1765
          realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1766
          channelLogger.log(
1✔
1767
              ChannelLogLevel.INFO,
1768
              "Received no service config, using default service config");
1769
        } else if (serviceConfigError != null) {
1✔
1770
          if (!serviceConfigUpdated) {
1✔
1771
            // First DNS lookup has invalid service config, and cannot fall back to default
1772
            channelLogger.log(
1✔
1773
                ChannelLogLevel.INFO,
1774
                "Fallback to error due to invalid first service config without default config");
1775
            // This error could be an "inappropriate" control plane error that should not bleed
1776
            // through to client code using gRPC. We let them flow through here to the LB as
1777
            // we later check for these error codes when investigating pick results in
1778
            // GrpcUtil.getTransportFromPickResult().
1779
            onError(configOrError.getError());
1✔
1780
            return configOrError.getError();
1✔
1781
          } else {
1782
            effectiveServiceConfig = lastServiceConfig;
1✔
1783
          }
1784
        } else {
1785
          effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1786
          realChannel.updateConfigSelector(null);
1✔
1787
        }
1788
        if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1789
          channelLogger.log(
1✔
1790
              ChannelLogLevel.INFO,
1791
              "Service config changed{0}",
1792
              effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1793
          lastServiceConfig = effectiveServiceConfig;
1✔
1794
          transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1795
        }
1796

1797
        try {
1798
          // TODO(creamsoup): when `serversOrError` is empty and lastResolutionStateCopy == SUCCESS
1799
          //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1800
          //  lbNeedAddress is not deterministic
1801
          serviceConfigUpdated = true;
1✔
1802
        } catch (RuntimeException re) {
×
1803
          logger.log(
×
1804
              Level.WARNING,
1805
              "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1806
              re);
1807
        }
1✔
1808
      }
1809

1810
      Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1811
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1812
      if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1813
        Attributes.Builder attrBuilder =
1✔
1814
            effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1815
        Map<String, ?> healthCheckingConfig =
1✔
1816
            effectiveServiceConfig.getHealthCheckingConfig();
1✔
1817
        if (healthCheckingConfig != null) {
1✔
1818
          attrBuilder
1✔
1819
              .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1820
              .build();
1✔
1821
        }
1822
        Attributes attributes = attrBuilder.build();
1✔
1823

1824
        ResolvedAddresses.Builder resolvedAddresses = ResolvedAddresses.newBuilder()
1✔
1825
            .setAddresses(serversOrError.getValue())
1✔
1826
            .setAttributes(attributes)
1✔
1827
            .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig());
1✔
1828
        Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1829
            resolvedAddresses.build());
1✔
1830
        return addressAcceptanceStatus;
1✔
1831
      }
1832
      return Status.OK;
×
1833
    }
1834

1835
    @Override
1836
    public void onError(final Status error) {
1837
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1838
      final class NameResolverErrorHandler implements Runnable {
1✔
1839
        @Override
1840
        public void run() {
1841
          handleErrorInSyncContext(error);
1✔
1842
        }
1✔
1843
      }
1844

1845
      syncContext.execute(new NameResolverErrorHandler());
1✔
1846
    }
1✔
1847

1848
    private void handleErrorInSyncContext(Status error) {
1849
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1850
          new Object[] {getLogId(), error});
1✔
1851
      realChannel.onConfigError();
1✔
1852
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1853
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1854
        lastResolutionState = ResolutionState.ERROR;
1✔
1855
      }
1856
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1857
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1858
        return;
1✔
1859
      }
1860

1861
      helper.lb.handleNameResolutionError(error);
1✔
1862
    }
1✔
1863
  }
1864

1865
  private final class SubchannelImpl extends AbstractSubchannel {
1866
    final CreateSubchannelArgs args;
1867
    final InternalLogId subchannelLogId;
1868
    final ChannelLoggerImpl subchannelLogger;
1869
    final ChannelTracer subchannelTracer;
1870
    List<EquivalentAddressGroup> addressGroups;
1871
    InternalSubchannel subchannel;
1872
    boolean started;
1873
    boolean shutdown;
1874
    ScheduledHandle delayedShutdownTask;
1875

1876
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1877
      checkNotNull(args, "args");
1✔
1878
      addressGroups = args.getAddresses();
1✔
1879
      if (authorityOverride != null) {
1✔
1880
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1881
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1882
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1883
      }
1884
      this.args = args;
1✔
1885
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1886
      subchannelTracer = new ChannelTracer(
1✔
1887
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1888
          "Subchannel for " + args.getAddresses());
1✔
1889
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1890
    }
1✔
1891

1892
    @Override
1893
    public void start(final SubchannelStateListener listener) {
1894
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1895
      checkState(!started, "already started");
1✔
1896
      checkState(!shutdown, "already shutdown");
1✔
1897
      checkState(!terminating, "Channel is being terminated");
1✔
1898
      started = true;
1✔
1899
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1900
        // All callbacks are run in syncContext
1901
        @Override
1902
        void onTerminated(InternalSubchannel is) {
1903
          subchannels.remove(is);
1✔
1904
          channelz.removeSubchannel(is);
1✔
1905
          maybeTerminateChannel();
1✔
1906
        }
1✔
1907

1908
        @Override
1909
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1910
          checkState(listener != null, "listener is null");
1✔
1911
          listener.onSubchannelState(newState);
1✔
1912
        }
1✔
1913

1914
        @Override
1915
        void onInUse(InternalSubchannel is) {
1916
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1917
        }
1✔
1918

1919
        @Override
1920
        void onNotInUse(InternalSubchannel is) {
1921
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1922
        }
1✔
1923
      }
1924

1925
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1926
          args,
1927
          authority(),
1✔
1928
          userAgent,
1✔
1929
          backoffPolicyProvider,
1✔
1930
          transportFactory,
1✔
1931
          transportFactory.getScheduledExecutorService(),
1✔
1932
          stopwatchSupplier,
1✔
1933
          syncContext,
1934
          new ManagedInternalSubchannelCallback(),
1935
          channelz,
1✔
1936
          callTracerFactory.create(),
1✔
1937
          subchannelTracer,
1938
          subchannelLogId,
1939
          subchannelLogger,
1940
          transportFilters);
1✔
1941

1942
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1943
          .setDescription("Child Subchannel started")
1✔
1944
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1945
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1946
          .setSubchannelRef(internalSubchannel)
1✔
1947
          .build());
1✔
1948

1949
      this.subchannel = internalSubchannel;
1✔
1950
      channelz.addSubchannel(internalSubchannel);
1✔
1951
      subchannels.add(internalSubchannel);
1✔
1952
    }
1✔
1953

1954
    @Override
1955
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1956
      checkState(started, "not started");
1✔
1957
      return subchannel;
1✔
1958
    }
1959

1960
    @Override
1961
    public void shutdown() {
1962
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1963
      if (subchannel == null) {
1✔
1964
        // start() was not successful
1965
        shutdown = true;
×
1966
        return;
×
1967
      }
1968
      if (shutdown) {
1✔
1969
        if (terminating && delayedShutdownTask != null) {
1✔
1970
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1971
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1972
          delayedShutdownTask.cancel();
×
1973
          delayedShutdownTask = null;
×
1974
          // Will fall through to the subchannel.shutdown() at the end.
1975
        } else {
1976
          return;
1✔
1977
        }
1978
      } else {
1979
        shutdown = true;
1✔
1980
      }
1981
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1982
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1983
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1984
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1985
      // shutdown of Subchannel for a few seconds here.
1986
      //
1987
      // TODO(zhangkun83): consider a better approach
1988
      // (https://github.com/grpc/grpc-java/issues/2562).
1989
      if (!terminating) {
1✔
1990
        final class ShutdownSubchannel implements Runnable {
1✔
1991
          @Override
1992
          public void run() {
1993
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
1994
          }
1✔
1995
        }
1996

1997
        delayedShutdownTask = syncContext.schedule(
1✔
1998
            new LogExceptionRunnable(new ShutdownSubchannel()),
1999
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2000
            transportFactory.getScheduledExecutorService());
1✔
2001
        return;
1✔
2002
      }
2003
      // When terminating == true, no more real streams will be created. It's safe and also
2004
      // desirable to shutdown timely.
2005
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
2006
    }
1✔
2007

2008
    @Override
2009
    public void requestConnection() {
2010
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2011
      checkState(started, "not started");
1✔
2012
      subchannel.obtainActiveTransport();
1✔
2013
    }
1✔
2014

2015
    @Override
2016
    public List<EquivalentAddressGroup> getAllAddresses() {
2017
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2018
      checkState(started, "not started");
1✔
2019
      return addressGroups;
1✔
2020
    }
2021

2022
    @Override
2023
    public Attributes getAttributes() {
2024
      return args.getAttributes();
1✔
2025
    }
2026

2027
    @Override
2028
    public String toString() {
2029
      return subchannelLogId.toString();
1✔
2030
    }
2031

2032
    @Override
2033
    public Channel asChannel() {
2034
      checkState(started, "not started");
1✔
2035
      return new SubchannelChannel(
1✔
2036
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2037
          transportFactory.getScheduledExecutorService(),
1✔
2038
          callTracerFactory.create(),
1✔
2039
          new AtomicReference<InternalConfigSelector>(null));
2040
    }
2041

2042
    @Override
2043
    public Object getInternalSubchannel() {
2044
      checkState(started, "Subchannel is not started");
1✔
2045
      return subchannel;
1✔
2046
    }
2047

2048
    @Override
2049
    public ChannelLogger getChannelLogger() {
2050
      return subchannelLogger;
1✔
2051
    }
2052

2053
    @Override
2054
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2055
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2056
      addressGroups = addrs;
1✔
2057
      if (authorityOverride != null) {
1✔
2058
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2059
      }
2060
      subchannel.updateAddresses(addrs);
1✔
2061
    }
1✔
2062

2063
    @Override
2064
    public Attributes getConnectedAddressAttributes() {
2065
      return subchannel.getConnectedAddressAttributes();
1✔
2066
    }
2067

2068
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2069
        List<EquivalentAddressGroup> eags) {
2070
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2071
      for (EquivalentAddressGroup eag : eags) {
1✔
2072
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2073
            eag.getAddresses(),
1✔
2074
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2075
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2076
      }
1✔
2077
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2078
    }
2079
  }
2080

2081
  @Override
2082
  public String toString() {
2083
    return MoreObjects.toStringHelper(this)
1✔
2084
        .add("logId", logId.getId())
1✔
2085
        .add("target", target)
1✔
2086
        .toString();
1✔
2087
  }
2088

2089
  /**
2090
   * Called from syncContext.
2091
   */
2092
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2093
    @Override
2094
    public void transportShutdown(Status s) {
2095
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2096
    }
1✔
2097

2098
    @Override
2099
    public void transportReady() {
2100
      // Don't care
2101
    }
×
2102

2103
    @Override
2104
    public Attributes filterTransport(Attributes attributes) {
2105
      return attributes;
×
2106
    }
2107

2108
    @Override
2109
    public void transportInUse(final boolean inUse) {
2110
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2111
      if (inUse) {
1✔
2112
        // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
2113
        // subchannels is in use. But we should never be in idle mode when delayed transport is in
2114
        // use.
2115
        exitIdleMode();
1✔
2116
      }
2117
    }
1✔
2118

2119
    @Override
2120
    public void transportTerminated() {
2121
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2122
      terminating = true;
1✔
2123
      shutdownNameResolverAndLoadBalancer(false);
1✔
2124
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2125
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2126
      // here.
2127
      maybeShutdownNowSubchannels();
1✔
2128
      maybeTerminateChannel();
1✔
2129
    }
1✔
2130
  }
2131

2132
  /**
2133
   * Must be accessed from syncContext.
2134
   */
2135
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2136
    @Override
2137
    protected void handleInUse() {
2138
      exitIdleMode();
1✔
2139
    }
1✔
2140

2141
    @Override
2142
    protected void handleNotInUse() {
2143
      if (shutdown.get()) {
1✔
2144
        return;
1✔
2145
      }
2146
      rescheduleIdleTimer();
1✔
2147
    }
1✔
2148
  }
2149

2150
  /**
2151
   * Lazily request for Executor from an executor pool.
2152
   * Also act as an Executor directly to simply run a cmd
2153
   */
2154
  @VisibleForTesting
2155
  static final class ExecutorHolder implements Executor {
2156
    private final ObjectPool<? extends Executor> pool;
2157
    private Executor executor;
2158

2159
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2160
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2161
    }
1✔
2162

2163
    synchronized Executor getExecutor() {
2164
      if (executor == null) {
1✔
2165
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2166
      }
2167
      return executor;
1✔
2168
    }
2169

2170
    synchronized void release() {
2171
      if (executor != null) {
1✔
2172
        executor = pool.returnObject(executor);
1✔
2173
      }
2174
    }
1✔
2175

2176
    @Override
2177
    public void execute(Runnable command) {
2178
      getExecutor().execute(command);
1✔
2179
    }
1✔
2180
  }
2181

2182
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2183
    final ScheduledExecutorService delegate;
2184

2185
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2186
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2187
    }
1✔
2188

2189
    @Override
2190
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2191
      return delegate.schedule(callable, delay, unit);
×
2192
    }
2193

2194
    @Override
2195
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2196
      return delegate.schedule(cmd, delay, unit);
1✔
2197
    }
2198

2199
    @Override
2200
    public ScheduledFuture<?> scheduleAtFixedRate(
2201
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2202
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
1✔
2203
    }
2204

2205
    @Override
2206
    public ScheduledFuture<?> scheduleWithFixedDelay(
2207
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2208
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2209
    }
2210

2211
    @Override
2212
    public boolean awaitTermination(long timeout, TimeUnit unit)
2213
        throws InterruptedException {
2214
      return delegate.awaitTermination(timeout, unit);
×
2215
    }
2216

2217
    @Override
2218
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2219
        throws InterruptedException {
2220
      return delegate.invokeAll(tasks);
×
2221
    }
2222

2223
    @Override
2224
    public <T> List<Future<T>> invokeAll(
2225
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2226
        throws InterruptedException {
2227
      return delegate.invokeAll(tasks, timeout, unit);
×
2228
    }
2229

2230
    @Override
2231
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2232
        throws InterruptedException, ExecutionException {
2233
      return delegate.invokeAny(tasks);
×
2234
    }
2235

2236
    @Override
2237
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2238
        throws InterruptedException, ExecutionException, TimeoutException {
2239
      return delegate.invokeAny(tasks, timeout, unit);
×
2240
    }
2241

2242
    @Override
2243
    public boolean isShutdown() {
2244
      return delegate.isShutdown();
×
2245
    }
2246

2247
    @Override
2248
    public boolean isTerminated() {
2249
      return delegate.isTerminated();
×
2250
    }
2251

2252
    @Override
2253
    public void shutdown() {
2254
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2255
    }
2256

2257
    @Override
2258
    public List<Runnable> shutdownNow() {
2259
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2260
    }
2261

2262
    @Override
2263
    public <T> Future<T> submit(Callable<T> task) {
2264
      return delegate.submit(task);
×
2265
    }
2266

2267
    @Override
2268
    public Future<?> submit(Runnable task) {
2269
      return delegate.submit(task);
×
2270
    }
2271

2272
    @Override
2273
    public <T> Future<T> submit(Runnable task, T result) {
2274
      return delegate.submit(task, result);
×
2275
    }
2276

2277
    @Override
2278
    public void execute(Runnable command) {
2279
      delegate.execute(command);
×
2280
    }
×
2281
  }
2282

2283
  /**
2284
   * A ResolutionState indicates the status of last name resolution.
2285
   */
2286
  enum ResolutionState {
1✔
2287
    NO_RESOLUTION,
1✔
2288
    SUCCESS,
1✔
2289
    ERROR
1✔
2290
  }
2291
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc