• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19975

08 Sep 2025 09:55PM UTC coverage: 88.547% (+0.01%) from 88.535%
#19975

push

github

web-flow
otel: subchannel metrics A94 (#12202)

Implements [A94](https://github.com/grpc/proposal/pull/485/files) except for the exact reason for disconnect_error

34806 of 39308 relevant lines covered (88.55%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.45
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import com.google.errorprone.annotations.concurrent.GuardedBy;
36
import io.grpc.Attributes;
37
import io.grpc.CallCredentials;
38
import io.grpc.CallOptions;
39
import io.grpc.Channel;
40
import io.grpc.ChannelCredentials;
41
import io.grpc.ChannelLogger;
42
import io.grpc.ChannelLogger.ChannelLogLevel;
43
import io.grpc.ClientCall;
44
import io.grpc.ClientInterceptor;
45
import io.grpc.ClientInterceptors;
46
import io.grpc.ClientStreamTracer;
47
import io.grpc.ClientTransportFilter;
48
import io.grpc.CompressorRegistry;
49
import io.grpc.ConnectivityState;
50
import io.grpc.ConnectivityStateInfo;
51
import io.grpc.Context;
52
import io.grpc.Deadline;
53
import io.grpc.DecompressorRegistry;
54
import io.grpc.EquivalentAddressGroup;
55
import io.grpc.ForwardingChannelBuilder2;
56
import io.grpc.ForwardingClientCall;
57
import io.grpc.Grpc;
58
import io.grpc.InternalChannelz;
59
import io.grpc.InternalChannelz.ChannelStats;
60
import io.grpc.InternalChannelz.ChannelTrace;
61
import io.grpc.InternalConfigSelector;
62
import io.grpc.InternalInstrumented;
63
import io.grpc.InternalLogId;
64
import io.grpc.InternalWithLogId;
65
import io.grpc.LoadBalancer;
66
import io.grpc.LoadBalancer.CreateSubchannelArgs;
67
import io.grpc.LoadBalancer.PickResult;
68
import io.grpc.LoadBalancer.PickSubchannelArgs;
69
import io.grpc.LoadBalancer.ResolvedAddresses;
70
import io.grpc.LoadBalancer.SubchannelPicker;
71
import io.grpc.LoadBalancer.SubchannelStateListener;
72
import io.grpc.ManagedChannel;
73
import io.grpc.ManagedChannelBuilder;
74
import io.grpc.Metadata;
75
import io.grpc.MethodDescriptor;
76
import io.grpc.MetricInstrumentRegistry;
77
import io.grpc.MetricRecorder;
78
import io.grpc.NameResolver;
79
import io.grpc.NameResolver.ConfigOrError;
80
import io.grpc.NameResolver.ResolutionResult;
81
import io.grpc.NameResolverProvider;
82
import io.grpc.NameResolverRegistry;
83
import io.grpc.ProxyDetector;
84
import io.grpc.Status;
85
import io.grpc.StatusOr;
86
import io.grpc.SynchronizationContext;
87
import io.grpc.SynchronizationContext.ScheduledHandle;
88
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
89
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
90
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
91
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
92
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
93
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
94
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
95
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
96
import io.grpc.internal.RetriableStream.Throttle;
97
import java.net.URI;
98
import java.util.ArrayList;
99
import java.util.Collection;
100
import java.util.Collections;
101
import java.util.HashSet;
102
import java.util.LinkedHashSet;
103
import java.util.List;
104
import java.util.Map;
105
import java.util.Set;
106
import java.util.concurrent.Callable;
107
import java.util.concurrent.CountDownLatch;
108
import java.util.concurrent.ExecutionException;
109
import java.util.concurrent.Executor;
110
import java.util.concurrent.Future;
111
import java.util.concurrent.ScheduledExecutorService;
112
import java.util.concurrent.ScheduledFuture;
113
import java.util.concurrent.TimeUnit;
114
import java.util.concurrent.TimeoutException;
115
import java.util.concurrent.atomic.AtomicBoolean;
116
import java.util.concurrent.atomic.AtomicReference;
117
import java.util.logging.Level;
118
import java.util.logging.Logger;
119
import javax.annotation.Nullable;
120
import javax.annotation.concurrent.ThreadSafe;
121

122
/** A communication channel for making outgoing RPCs. */
123
@ThreadSafe
124
final class ManagedChannelImpl extends ManagedChannel implements
125
    InternalInstrumented<ChannelStats> {
126
  @VisibleForTesting
127
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
128

129
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
130

131
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
132

133
  @VisibleForTesting
134
  static final Status SHUTDOWN_NOW_STATUS =
1✔
135
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
136

137
  @VisibleForTesting
138
  static final Status SHUTDOWN_STATUS =
1✔
139
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
140

141
  @VisibleForTesting
142
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
143
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
144

145
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
146
      ManagedChannelServiceConfig.empty();
1✔
147
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
148
      new InternalConfigSelector() {
1✔
149
        @Override
150
        public Result selectConfig(PickSubchannelArgs args) {
151
          throw new IllegalStateException("Resolution is pending");
×
152
        }
153
      };
154
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
155
      new LoadBalancer.PickDetailsConsumer() {};
1✔
156

157
  private final InternalLogId logId;
158
  private final String target;
159
  @Nullable
160
  private final String authorityOverride;
161
  private final NameResolverRegistry nameResolverRegistry;
162
  private final URI targetUri;
163
  private final NameResolverProvider nameResolverProvider;
164
  private final NameResolver.Args nameResolverArgs;
165
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
166
  private final ClientTransportFactory originalTransportFactory;
167
  @Nullable
168
  private final ChannelCredentials originalChannelCreds;
169
  private final ClientTransportFactory transportFactory;
170
  private final ClientTransportFactory oobTransportFactory;
171
  private final RestrictedScheduledExecutor scheduledExecutor;
172
  private final Executor executor;
173
  private final ObjectPool<? extends Executor> executorPool;
174
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
175
  private final ExecutorHolder balancerRpcExecutorHolder;
176
  private final ExecutorHolder offloadExecutorHolder;
177
  private final TimeProvider timeProvider;
178
  private final int maxTraceEvents;
179

180
  @VisibleForTesting
1✔
181
  final SynchronizationContext syncContext = new SynchronizationContext(
182
      new Thread.UncaughtExceptionHandler() {
1✔
183
        @Override
184
        public void uncaughtException(Thread t, Throwable e) {
185
          logger.log(
1✔
186
              Level.SEVERE,
187
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
188
              e);
189
          try {
190
            panic(e);
1✔
191
          } catch (Throwable anotherT) {
×
192
            logger.log(
×
193
                Level.SEVERE, "[" + getLogId() + "] Uncaught exception while panicking", anotherT);
×
194
          }
1✔
195
        }
1✔
196
      });
197

198
  private boolean fullStreamDecompression;
199

200
  private final DecompressorRegistry decompressorRegistry;
201
  private final CompressorRegistry compressorRegistry;
202

203
  private final Supplier<Stopwatch> stopwatchSupplier;
204
  /** The timeout before entering idle mode. */
205
  private final long idleTimeoutMillis;
206

207
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
208
  private final BackoffPolicy.Provider backoffPolicyProvider;
209

210
  /**
211
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
212
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
213
   * {@link RealChannel}.
214
   */
215
  private final Channel interceptorChannel;
216

217
  private final List<ClientTransportFilter> transportFilters;
218
  @Nullable private final String userAgent;
219

220
  // Only null after channel is terminated. Must be assigned from the syncContext.
221
  private NameResolver nameResolver;
222

223
  // Must be accessed from the syncContext.
224
  private boolean nameResolverStarted;
225

226
  // null when channel is in idle mode.  Must be assigned from syncContext.
227
  @Nullable
228
  private LbHelperImpl lbHelper;
229

230
  // Must be accessed from the syncContext
231
  private boolean panicMode;
232

233
  // Must be mutated from syncContext
234
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
235
  // switch to a ConcurrentHashMap.
236
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
237

238
  // Must be accessed from syncContext
239
  @Nullable
240
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
241
  private final Object pendingCallsInUseObject = new Object();
1✔
242

243
  // Must be mutated from syncContext
244
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
245

246
  // reprocess() must be run from syncContext
247
  private final DelayedClientTransport delayedTransport;
248
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
249
      = new UncommittedRetriableStreamsRegistry();
250

251
  // Shutdown states.
252
  //
253
  // Channel's shutdown process:
254
  // 1. shutdown(): stop accepting new calls from applications
255
  //   1a shutdown <- true
256
  //   1b delayedTransport.shutdown()
257
  // 2. delayedTransport terminated: stop stream-creation functionality
258
  //   2a terminating <- true
259
  //   2b loadBalancer.shutdown()
260
  //     * LoadBalancer will shutdown subchannels and OOB channels
261
  //   2c loadBalancer <- null
262
  //   2d nameResolver.shutdown()
263
  //   2e nameResolver <- null
264
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
265

266
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
267
  // Must only be mutated and read from syncContext
268
  private boolean shutdownNowed;
269
  // Must only be mutated from syncContext
270
  private boolean terminating;
271
  // Must be mutated from syncContext
272
  private volatile boolean terminated;
273
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
274

275
  private final CallTracer.Factory callTracerFactory;
276
  private final CallTracer channelCallTracer;
277
  private final ChannelTracer channelTracer;
278
  private final ChannelLogger channelLogger;
279
  private final InternalChannelz channelz;
280
  private final RealChannel realChannel;
281
  // Must be mutated and read from syncContext
282
  // a flag for doing channel tracing when flipped
283
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
284
  // Must be mutated and read from constructor or syncContext
285
  // used for channel tracing when value changed
286
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
287

288
  @Nullable
289
  private final ManagedChannelServiceConfig defaultServiceConfig;
290
  // Must be mutated and read from constructor or syncContext
291
  private boolean serviceConfigUpdated = false;
1✔
292
  private final boolean lookUpServiceConfig;
293

294
  // One instance per channel.
295
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
296

297
  private final long perRpcBufferLimit;
298
  private final long channelBufferLimit;
299

300
  // Temporary false flag that can skip the retry code path.
301
  private final boolean retryEnabled;
302

303
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
304

305
  // Called from syncContext
306
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
307
      new DelayedTransportListener();
308

309
  // Must be called from syncContext
310
  private void maybeShutdownNowSubchannels() {
311
    if (shutdownNowed) {
1✔
312
      for (InternalSubchannel subchannel : subchannels) {
1✔
313
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
314
      }
1✔
315
      for (OobChannel oobChannel : oobChannels) {
1✔
316
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
317
      }
1✔
318
    }
319
  }
1✔
320

321
  // Must be accessed from syncContext
322
  @VisibleForTesting
1✔
323
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
324

325
  @Override
326
  public ListenableFuture<ChannelStats> getStats() {
327
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
328
    final class StatsFetcher implements Runnable {
1✔
329
      @Override
330
      public void run() {
331
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
332
        channelCallTracer.updateBuilder(builder);
1✔
333
        channelTracer.updateBuilder(builder);
1✔
334
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
335
        List<InternalWithLogId> children = new ArrayList<>();
1✔
336
        children.addAll(subchannels);
1✔
337
        children.addAll(oobChannels);
1✔
338
        builder.setSubchannels(children);
1✔
339
        ret.set(builder.build());
1✔
340
      }
1✔
341
    }
342

343
    // subchannels and oobchannels can only be accessed from syncContext
344
    syncContext.execute(new StatsFetcher());
1✔
345
    return ret;
1✔
346
  }
347

348
  @Override
349
  public InternalLogId getLogId() {
350
    return logId;
1✔
351
  }
352

353
  // Run from syncContext
354
  private class IdleModeTimer implements Runnable {
1✔
355

356
    @Override
357
    public void run() {
358
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
359
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
360
      // subtle to change rapidly to resolve the channel panic. See #8714
361
      if (lbHelper == null) {
1✔
362
        return;
×
363
      }
364
      enterIdleMode();
1✔
365
    }
1✔
366
  }
367

368
  // Must be called from syncContext
369
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
370
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
371
    if (channelIsActive) {
1✔
372
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
373
      checkState(lbHelper != null, "lbHelper is null");
1✔
374
    }
375
    if (nameResolver != null) {
1✔
376
      nameResolver.shutdown();
1✔
377
      nameResolverStarted = false;
1✔
378
      if (channelIsActive) {
1✔
379
        nameResolver = getNameResolver(
1✔
380
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
381
      } else {
382
        nameResolver = null;
1✔
383
      }
384
    }
385
    if (lbHelper != null) {
1✔
386
      lbHelper.lb.shutdown();
1✔
387
      lbHelper = null;
1✔
388
    }
389
  }
1✔
390

391
  /**
392
   * Make the channel exit idle mode, if it's in it.
393
   *
394
   * <p>Must be called from syncContext
395
   */
396
  @VisibleForTesting
397
  void exitIdleMode() {
398
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
399
    if (shutdown.get() || panicMode) {
1✔
400
      return;
1✔
401
    }
402
    if (inUseStateAggregator.isInUse()) {
1✔
403
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
404
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
405
      cancelIdleTimer(false);
1✔
406
    } else {
407
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
408
      // isInUse() == false, in which case we still need to schedule the timer.
409
      rescheduleIdleTimer();
1✔
410
    }
411
    if (lbHelper != null) {
1✔
412
      return;
1✔
413
    }
414
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
415
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
416
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
417
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
418
    // may throw. We don't want to confuse our state, even if we enter panic mode.
419
    this.lbHelper = lbHelper;
1✔
420

421
    channelStateManager.gotoState(CONNECTING);
1✔
422
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
423
    nameResolver.start(listener);
1✔
424
    nameResolverStarted = true;
1✔
425
  }
1✔
426

427
  // Must be run from syncContext
428
  private void enterIdleMode() {
429
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
430
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
431
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
432
    // which are bugs.
433
    shutdownNameResolverAndLoadBalancer(true);
1✔
434
    delayedTransport.reprocess(null);
1✔
435
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
436
    channelStateManager.gotoState(IDLE);
1✔
437
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
438
    // transport to be holding some we need to exit idle mode to give these calls a chance to
439
    // be processed.
440
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
441
      exitIdleMode();
1✔
442
    }
443
  }
1✔
444

445
  // Must be run from syncContext
446
  private void cancelIdleTimer(boolean permanent) {
447
    idleTimer.cancel(permanent);
1✔
448
  }
1✔
449

450
  // Always run from syncContext
451
  private void rescheduleIdleTimer() {
452
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
453
      return;
1✔
454
    }
455
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
456
  }
1✔
457

458
  /**
459
   * Force name resolution refresh to happen immediately. Must be run
460
   * from syncContext.
461
   */
462
  private void refreshNameResolution() {
463
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
464
    if (nameResolverStarted) {
1✔
465
      nameResolver.refresh();
1✔
466
    }
467
  }
1✔
468

469
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
470
    volatile Throttle throttle;
471

472
    @Override
473
    public ClientStream newStream(
474
        final MethodDescriptor<?, ?> method,
475
        final CallOptions callOptions,
476
        final Metadata headers,
477
        final Context context) {
478
      // There is no need to reschedule the idle timer here. If the channel isn't shut down, either
479
      // the delayed transport or a real transport will go in-use and cancel the idle timer.
480
      if (!retryEnabled) {
1✔
481
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
482
            callOptions, headers, 0, /* isTransparentRetry= */ false);
483
        Context origContext = context.attach();
1✔
484
        try {
485
          return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
486
        } finally {
487
          context.detach(origContext);
1✔
488
        }
489
      } else {
490
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
491
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
492
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
493
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
494
          @SuppressWarnings("unchecked")
495
          RetryStream() {
1✔
496
            super(
1✔
497
                (MethodDescriptor<ReqT, ?>) method,
498
                headers,
499
                channelBufferUsed,
1✔
500
                perRpcBufferLimit,
1✔
501
                channelBufferLimit,
1✔
502
                getCallExecutor(callOptions),
1✔
503
                transportFactory.getScheduledExecutorService(),
1✔
504
                retryPolicy,
505
                hedgingPolicy,
506
                throttle);
507
          }
1✔
508

509
          @Override
510
          Status prestart() {
511
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
512
          }
513

514
          @Override
515
          void postCommit() {
516
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
517
          }
1✔
518

519
          @Override
520
          ClientStream newSubstream(
521
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
522
              boolean isTransparentRetry) {
523
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
524
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
525
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
526
            Context origContext = context.attach();
1✔
527
            try {
528
              return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
1✔
529
            } finally {
530
              context.detach(origContext);
1✔
531
            }
532
          }
533
        }
534

535
        return new RetryStream<>();
1✔
536
      }
537
    }
538
  }
539

540
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
541

542
  private final Rescheduler idleTimer;
543
  private final MetricRecorder metricRecorder;
544

545
  ManagedChannelImpl(
546
      ManagedChannelImplBuilder builder,
547
      ClientTransportFactory clientTransportFactory,
548
      URI targetUri,
549
      NameResolverProvider nameResolverProvider,
550
      BackoffPolicy.Provider backoffPolicyProvider,
551
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
552
      Supplier<Stopwatch> stopwatchSupplier,
553
      List<ClientInterceptor> interceptors,
554
      final TimeProvider timeProvider) {
1✔
555
    this.target = checkNotNull(builder.target, "target");
1✔
556
    this.logId = InternalLogId.allocate("Channel", target);
1✔
557
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
558
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
559
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
560
    this.originalChannelCreds = builder.channelCredentials;
1✔
561
    this.originalTransportFactory = clientTransportFactory;
1✔
562
    this.offloadExecutorHolder =
1✔
563
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
564
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
565
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
566
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
567
        clientTransportFactory, null, this.offloadExecutorHolder);
568
    this.scheduledExecutor =
1✔
569
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
570
    maxTraceEvents = builder.maxTraceEvents;
1✔
571
    channelTracer = new ChannelTracer(
1✔
572
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
573
        "Channel for '" + target + "'");
574
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
575
    ProxyDetector proxyDetector =
576
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
577
    this.retryEnabled = builder.retryEnabled;
1✔
578
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
579
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
580
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
581
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
582
    ScParser serviceConfigParser =
1✔
583
        new ScParser(
584
            retryEnabled,
585
            builder.maxRetryAttempts,
586
            builder.maxHedgedAttempts,
587
            loadBalancerFactory);
588
    this.authorityOverride = builder.authorityOverride;
1✔
589
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
590
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
591
    NameResolver.Args.Builder nameResolverArgsBuilder = NameResolver.Args.newBuilder()
1✔
592
            .setDefaultPort(builder.getDefaultPort())
1✔
593
            .setProxyDetector(proxyDetector)
1✔
594
            .setSynchronizationContext(syncContext)
1✔
595
            .setScheduledExecutorService(scheduledExecutor)
1✔
596
            .setServiceConfigParser(serviceConfigParser)
1✔
597
            .setChannelLogger(channelLogger)
1✔
598
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
599
            .setOverrideAuthority(this.authorityOverride)
1✔
600
            .setMetricRecorder(this.metricRecorder)
1✔
601
            .setNameResolverRegistry(builder.nameResolverRegistry);
1✔
602
    builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
1✔
603
    this.nameResolverArgs = nameResolverArgsBuilder.build();
1✔
604
    this.nameResolver = getNameResolver(
1✔
605
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
606
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
607
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
608
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
609
    this.delayedTransport.start(delayedTransportListener);
1✔
610
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
611

612
    if (builder.defaultServiceConfig != null) {
1✔
613
      ConfigOrError parsedDefaultServiceConfig =
1✔
614
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
615
      checkState(
1✔
616
          parsedDefaultServiceConfig.getError() == null,
1✔
617
          "Default config is invalid: %s",
618
          parsedDefaultServiceConfig.getError());
1✔
619
      this.defaultServiceConfig =
1✔
620
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
621
      this.transportProvider.throttle = this.defaultServiceConfig.getRetryThrottling();
1✔
622
    } else {
1✔
623
      this.defaultServiceConfig = null;
1✔
624
    }
625
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
626
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
627
    Channel channel = realChannel;
1✔
628
    if (builder.binlog != null) {
1✔
629
      channel = builder.binlog.wrapChannel(channel);
1✔
630
    }
631
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
632
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
633
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
634
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
635
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
636
    } else {
637
      checkArgument(
1✔
638
          builder.idleTimeoutMillis
639
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
640
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
641
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
642
    }
643

644
    idleTimer = new Rescheduler(
1✔
645
        new IdleModeTimer(),
646
        syncContext,
647
        transportFactory.getScheduledExecutorService(),
1✔
648
        stopwatchSupplier.get());
1✔
649
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
650
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
651
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
652
    this.userAgent = builder.userAgent;
1✔
653

654
    this.channelBufferLimit = builder.retryBufferSize;
1✔
655
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
656
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
657
      @Override
658
      public CallTracer create() {
659
        return new CallTracer(timeProvider);
1✔
660
      }
661
    }
662

663
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
664
    channelCallTracer = callTracerFactory.create();
1✔
665
    this.channelz = checkNotNull(builder.channelz);
1✔
666
    channelz.addRootChannel(this);
1✔
667

668
    if (!lookUpServiceConfig) {
1✔
669
      if (defaultServiceConfig != null) {
1✔
670
        channelLogger.log(
1✔
671
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
672
      }
673
      serviceConfigUpdated = true;
1✔
674
    }
675
  }
1✔
676

677
  @VisibleForTesting
678
  static NameResolver getNameResolver(
679
      URI targetUri, @Nullable final String overrideAuthority,
680
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
681
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
682
    if (resolver == null) {
1✔
683
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
684
    }
685

686
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
687
    // TODO: After a transition period, all NameResolver implementations that need retry should use
688
    //       RetryingNameResolver directly and this step can be removed.
689
    NameResolver usedNameResolver = RetryingNameResolver.wrap(resolver, nameResolverArgs);
1✔
690

691
    if (overrideAuthority == null) {
1✔
692
      return usedNameResolver;
1✔
693
    }
694

695
    return new ForwardingNameResolver(usedNameResolver) {
1✔
696
      @Override
697
      public String getServiceAuthority() {
698
        return overrideAuthority;
1✔
699
      }
700
    };
701
  }
702

703
  @VisibleForTesting
704
  InternalConfigSelector getConfigSelector() {
705
    return realChannel.configSelector.get();
1✔
706
  }
707
  
708
  @VisibleForTesting
709
  boolean hasThrottle() {
710
    return this.transportProvider.throttle != null;
1✔
711
  }
712

713
  /**
714
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
715
   * cancelled.
716
   */
717
  @Override
718
  public ManagedChannelImpl shutdown() {
719
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
720
    if (!shutdown.compareAndSet(false, true)) {
1✔
721
      return this;
1✔
722
    }
723
    final class Shutdown implements Runnable {
1✔
724
      @Override
725
      public void run() {
726
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
727
        channelStateManager.gotoState(SHUTDOWN);
1✔
728
      }
1✔
729
    }
730

731
    syncContext.execute(new Shutdown());
1✔
732
    realChannel.shutdown();
1✔
733
    final class CancelIdleTimer implements Runnable {
1✔
734
      @Override
735
      public void run() {
736
        cancelIdleTimer(/* permanent= */ true);
1✔
737
      }
1✔
738
    }
739

740
    syncContext.execute(new CancelIdleTimer());
1✔
741
    return this;
1✔
742
  }
743

744
  /**
745
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
746
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
747
   * return {@code false} immediately after this method returns.
748
   */
749
  @Override
750
  public ManagedChannelImpl shutdownNow() {
751
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
752
    shutdown();
1✔
753
    realChannel.shutdownNow();
1✔
754
    final class ShutdownNow implements Runnable {
1✔
755
      @Override
756
      public void run() {
757
        if (shutdownNowed) {
1✔
758
          return;
1✔
759
        }
760
        shutdownNowed = true;
1✔
761
        maybeShutdownNowSubchannels();
1✔
762
      }
1✔
763
    }
764

765
    syncContext.execute(new ShutdownNow());
1✔
766
    return this;
1✔
767
  }
768

769
  // Called from syncContext
770
  @VisibleForTesting
771
  void panic(final Throwable t) {
772
    if (panicMode) {
1✔
773
      // Preserve the first panic information
774
      return;
×
775
    }
776
    panicMode = true;
1✔
777
    try {
778
      cancelIdleTimer(/* permanent= */ true);
1✔
779
      shutdownNameResolverAndLoadBalancer(false);
1✔
780
    } finally {
781
      updateSubchannelPicker(new LoadBalancer.FixedResultPicker(PickResult.withDrop(
1✔
782
          Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t))));
1✔
783
      realChannel.updateConfigSelector(null);
1✔
784
      channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
785
      channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
786
    }
787
  }
1✔
788

789
  @VisibleForTesting
790
  boolean isInPanicMode() {
791
    return panicMode;
1✔
792
  }
793

794
  // Called from syncContext
795
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
796
    delayedTransport.reprocess(newPicker);
1✔
797
  }
1✔
798

799
  @Override
800
  public boolean isShutdown() {
801
    return shutdown.get();
1✔
802
  }
803

804
  @Override
805
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
806
    return terminatedLatch.await(timeout, unit);
1✔
807
  }
808

809
  @Override
810
  public boolean isTerminated() {
811
    return terminated;
1✔
812
  }
813

814
  /*
815
   * Creates a new outgoing call on the channel.
816
   */
817
  @Override
818
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
819
      CallOptions callOptions) {
820
    return interceptorChannel.newCall(method, callOptions);
1✔
821
  }
822

823
  @Override
824
  public String authority() {
825
    return interceptorChannel.authority();
1✔
826
  }
827

828
  private Executor getCallExecutor(CallOptions callOptions) {
829
    Executor executor = callOptions.getExecutor();
1✔
830
    if (executor == null) {
1✔
831
      executor = this.executor;
1✔
832
    }
833
    return executor;
1✔
834
  }
835

836
  private class RealChannel extends Channel {
837
    // Reference to null if no config selector is available from resolution result
838
    // Reference must be set() from syncContext
839
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
840
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
841
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
842
    // same target, the new instance must have the same value.
843
    private final String authority;
844

845
    private final Channel clientCallImplChannel = new Channel() {
1✔
846
      @Override
847
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
848
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
849
        return new ClientCallImpl<>(
1✔
850
            method,
851
            getCallExecutor(callOptions),
1✔
852
            callOptions,
853
            transportProvider,
1✔
854
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
855
            channelCallTracer,
1✔
856
            null)
857
            .setFullStreamDecompression(fullStreamDecompression)
1✔
858
            .setDecompressorRegistry(decompressorRegistry)
1✔
859
            .setCompressorRegistry(compressorRegistry);
1✔
860
      }
861

862
      @Override
863
      public String authority() {
864
        return authority;
×
865
      }
866
    };
867

868
    private RealChannel(String authority) {
1✔
869
      this.authority =  checkNotNull(authority, "authority");
1✔
870
    }
1✔
871

872
    @Override
873
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
874
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
875
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
876
        return newClientCall(method, callOptions);
1✔
877
      }
878
      syncContext.execute(new Runnable() {
1✔
879
        @Override
880
        public void run() {
881
          exitIdleMode();
1✔
882
        }
1✔
883
      });
884
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
885
        // This is an optimization for the case (typically with InProcessTransport) when name
886
        // resolution result is immediately available at this point. Otherwise, some users'
887
        // tests might observe slight behavior difference from earlier grpc versions.
888
        return newClientCall(method, callOptions);
1✔
889
      }
890
      if (shutdown.get()) {
1✔
891
        // Return a failing ClientCall.
892
        return new ClientCall<ReqT, RespT>() {
×
893
          @Override
894
          public void start(Listener<RespT> responseListener, Metadata headers) {
895
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
896
          }
×
897

898
          @Override public void request(int numMessages) {}
×
899

900
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
901

902
          @Override public void halfClose() {}
×
903

904
          @Override public void sendMessage(ReqT message) {}
×
905
        };
906
      }
907
      Context context = Context.current();
1✔
908
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
909
      syncContext.execute(new Runnable() {
1✔
910
        @Override
911
        public void run() {
912
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
913
            if (pendingCalls == null) {
1✔
914
              pendingCalls = new LinkedHashSet<>();
1✔
915
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
916
            }
917
            pendingCalls.add(pendingCall);
1✔
918
          } else {
919
            pendingCall.reprocess();
1✔
920
          }
921
        }
1✔
922
      });
923
      return pendingCall;
1✔
924
    }
925

926
    // Must run in SynchronizationContext.
927
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
928
      InternalConfigSelector prevConfig = configSelector.get();
1✔
929
      configSelector.set(config);
1✔
930
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
931
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
932
          pendingCall.reprocess();
1✔
933
        }
1✔
934
      }
935
    }
1✔
936

937
    // Must run in SynchronizationContext.
938
    void onConfigError() {
939
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
940
        // Apply Default Service Config if initial name resolution fails.
941
        if (defaultServiceConfig != null) {
1✔
942
          updateConfigSelector(defaultServiceConfig.getDefaultConfigSelector());
1✔
943
          lastServiceConfig = defaultServiceConfig;
1✔
944
          channelLogger.log(ChannelLogLevel.ERROR,
1✔
945
              "Initial Name Resolution error, using default service config");
946
        } else {
947
          updateConfigSelector(null);
1✔
948
        }
949
      }
950
    }
1✔
951

952
    void shutdown() {
953
      final class RealChannelShutdown implements Runnable {
1✔
954
        @Override
955
        public void run() {
956
          if (pendingCalls == null) {
1✔
957
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
958
              configSelector.set(null);
1✔
959
            }
960
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
961
          }
962
        }
1✔
963
      }
964

965
      syncContext.execute(new RealChannelShutdown());
1✔
966
    }
1✔
967

968
    void shutdownNow() {
969
      final class RealChannelShutdownNow implements Runnable {
1✔
970
        @Override
971
        public void run() {
972
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
973
            configSelector.set(null);
1✔
974
          }
975
          if (pendingCalls != null) {
1✔
976
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
977
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
978
            }
1✔
979
          }
980
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
981
        }
1✔
982
      }
983

984
      syncContext.execute(new RealChannelShutdownNow());
1✔
985
    }
1✔
986

987
    @Override
988
    public String authority() {
989
      return authority;
1✔
990
    }
991

992
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
993
      final Context context;
994
      final MethodDescriptor<ReqT, RespT> method;
995
      final CallOptions callOptions;
996
      private final long callCreationTime;
997

998
      PendingCall(
999
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1000
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1001
        this.context = context;
1✔
1002
        this.method = method;
1✔
1003
        this.callOptions = callOptions;
1✔
1004
        this.callCreationTime = ticker.nanoTime();
1✔
1005
      }
1✔
1006

1007
      /** Called when it's ready to create a real call and reprocess the pending call. */
1008
      void reprocess() {
1009
        ClientCall<ReqT, RespT> realCall;
1010
        Context previous = context.attach();
1✔
1011
        try {
1012
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1013
              ticker.nanoTime() - callCreationTime);
1✔
1014
          realCall = newClientCall(method, delayResolutionOption);
1✔
1015
        } finally {
1016
          context.detach(previous);
1✔
1017
        }
1018
        Runnable toRun = setCall(realCall);
1✔
1019
        if (toRun == null) {
1✔
1020
          syncContext.execute(new PendingCallRemoval());
1✔
1021
        } else {
1022
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1023
            @Override
1024
            public void run() {
1025
              toRun.run();
1✔
1026
              syncContext.execute(new PendingCallRemoval());
1✔
1027
            }
1✔
1028
          });
1029
        }
1030
      }
1✔
1031

1032
      @Override
1033
      protected void callCancelled() {
1034
        super.callCancelled();
1✔
1035
        syncContext.execute(new PendingCallRemoval());
1✔
1036
      }
1✔
1037

1038
      final class PendingCallRemoval implements Runnable {
1✔
1039
        @Override
1040
        public void run() {
1041
          if (pendingCalls != null) {
1✔
1042
            pendingCalls.remove(PendingCall.this);
1✔
1043
            if (pendingCalls.isEmpty()) {
1✔
1044
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1045
              pendingCalls = null;
1✔
1046
              if (shutdown.get()) {
1✔
1047
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1048
              }
1049
            }
1050
          }
1051
        }
1✔
1052
      }
1053
    }
1054

1055
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1056
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1057
      InternalConfigSelector selector = configSelector.get();
1✔
1058
      if (selector == null) {
1✔
1059
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1060
      }
1061
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1062
        MethodInfo methodInfo =
1✔
1063
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1064
        if (methodInfo != null) {
1✔
1065
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1066
        }
1067
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1068
      }
1069
      return new ConfigSelectingClientCall<>(
1✔
1070
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1071
    }
1072
  }
1073

1074
  /**
1075
   * A client call for a given channel that applies a given config selector when it starts.
1076
   */
1077
  static final class ConfigSelectingClientCall<ReqT, RespT>
1078
      extends ForwardingClientCall<ReqT, RespT> {
1079

1080
    private final InternalConfigSelector configSelector;
1081
    private final Channel channel;
1082
    private final Executor callExecutor;
1083
    private final MethodDescriptor<ReqT, RespT> method;
1084
    private final Context context;
1085
    private CallOptions callOptions;
1086

1087
    private ClientCall<ReqT, RespT> delegate;
1088

1089
    ConfigSelectingClientCall(
1090
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1091
        MethodDescriptor<ReqT, RespT> method,
1092
        CallOptions callOptions) {
1✔
1093
      this.configSelector = configSelector;
1✔
1094
      this.channel = channel;
1✔
1095
      this.method = method;
1✔
1096
      this.callExecutor =
1✔
1097
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1098
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1099
      this.context = Context.current();
1✔
1100
    }
1✔
1101

1102
    @Override
1103
    protected ClientCall<ReqT, RespT> delegate() {
1104
      return delegate;
1✔
1105
    }
1106

1107
    @SuppressWarnings("unchecked")
1108
    @Override
1109
    public void start(Listener<RespT> observer, Metadata headers) {
1110
      PickSubchannelArgs args =
1✔
1111
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1112
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1113
      Status status = result.getStatus();
1✔
1114
      if (!status.isOk()) {
1✔
1115
        executeCloseObserverInContext(observer,
1✔
1116
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1117
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1118
        return;
1✔
1119
      }
1120
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1121
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1122
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1123
      if (methodInfo != null) {
1✔
1124
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1125
      }
1126
      if (interceptor != null) {
1✔
1127
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1128
      } else {
1129
        delegate = channel.newCall(method, callOptions);
×
1130
      }
1131
      delegate.start(observer, headers);
1✔
1132
    }
1✔
1133

1134
    private void executeCloseObserverInContext(
1135
        final Listener<RespT> observer, final Status status) {
1136
      class CloseInContext extends ContextRunnable {
1137
        CloseInContext() {
1✔
1138
          super(context);
1✔
1139
        }
1✔
1140

1141
        @Override
1142
        public void runInContext() {
1143
          observer.onClose(status, new Metadata());
1✔
1144
        }
1✔
1145
      }
1146

1147
      callExecutor.execute(new CloseInContext());
1✔
1148
    }
1✔
1149

1150
    @Override
1151
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1152
      if (delegate != null) {
×
1153
        delegate.cancel(message, cause);
×
1154
      }
1155
    }
×
1156
  }
1157

1158
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1159
    @Override
1160
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1161

1162
    @Override
1163
    public void request(int numMessages) {}
1✔
1164

1165
    @Override
1166
    public void cancel(String message, Throwable cause) {}
×
1167

1168
    @Override
1169
    public void halfClose() {}
×
1170

1171
    @Override
1172
    public void sendMessage(Object message) {}
×
1173

1174
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1175
    @Override
1176
    public boolean isReady() {
1177
      return false;
×
1178
    }
1179
  };
1180

1181
  /**
1182
   * Terminate the channel if termination conditions are met.
1183
   */
1184
  // Must be run from syncContext
1185
  private void maybeTerminateChannel() {
1186
    if (terminated) {
1✔
1187
      return;
×
1188
    }
1189
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1190
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1191
      channelz.removeRootChannel(this);
1✔
1192
      executorPool.returnObject(executor);
1✔
1193
      balancerRpcExecutorHolder.release();
1✔
1194
      offloadExecutorHolder.release();
1✔
1195
      // Release the transport factory so that it can deallocate any resources.
1196
      transportFactory.close();
1✔
1197

1198
      terminated = true;
1✔
1199
      terminatedLatch.countDown();
1✔
1200
    }
1201
  }
1✔
1202

1203
  // Must be called from syncContext
1204
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1205
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1206
      refreshNameResolution();
1✔
1207
    }
1208
  }
1✔
1209

1210
  @Override
1211
  public ConnectivityState getState(boolean requestConnection) {
1212
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1213
    if (requestConnection && savedChannelState == IDLE) {
1✔
1214
      final class RequestConnection implements Runnable {
1✔
1215
        @Override
1216
        public void run() {
1217
          exitIdleMode();
1✔
1218
          if (lbHelper != null) {
1✔
1219
            lbHelper.lb.requestConnection();
1✔
1220
          }
1221
        }
1✔
1222
      }
1223

1224
      syncContext.execute(new RequestConnection());
1✔
1225
    }
1226
    return savedChannelState;
1✔
1227
  }
1228

1229
  @Override
1230
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1231
    final class NotifyStateChanged implements Runnable {
1✔
1232
      @Override
1233
      public void run() {
1234
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1235
      }
1✔
1236
    }
1237

1238
    syncContext.execute(new NotifyStateChanged());
1✔
1239
  }
1✔
1240

1241
  @Override
1242
  public void resetConnectBackoff() {
1243
    final class ResetConnectBackoff implements Runnable {
1✔
1244
      @Override
1245
      public void run() {
1246
        if (shutdown.get()) {
1✔
1247
          return;
1✔
1248
        }
1249
        if (nameResolverStarted) {
1✔
1250
          refreshNameResolution();
1✔
1251
        }
1252
        for (InternalSubchannel subchannel : subchannels) {
1✔
1253
          subchannel.resetConnectBackoff();
1✔
1254
        }
1✔
1255
        for (OobChannel oobChannel : oobChannels) {
1✔
1256
          oobChannel.resetConnectBackoff();
×
1257
        }
×
1258
      }
1✔
1259
    }
1260

1261
    syncContext.execute(new ResetConnectBackoff());
1✔
1262
  }
1✔
1263

1264
  @Override
1265
  public void enterIdle() {
1266
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1267
      @Override
1268
      public void run() {
1269
        if (shutdown.get() || lbHelper == null) {
1✔
1270
          return;
1✔
1271
        }
1272
        cancelIdleTimer(/* permanent= */ false);
1✔
1273
        enterIdleMode();
1✔
1274
      }
1✔
1275
    }
1276

1277
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1278
  }
1✔
1279

1280
  /**
1281
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1282
   * backoff.
1283
   */
1284
  private final class UncommittedRetriableStreamsRegistry {
1✔
1285
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1286
    // it's worthwhile to look for a lock-free approach.
1287
    final Object lock = new Object();
1✔
1288

1289
    @GuardedBy("lock")
1✔
1290
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1291

1292
    @GuardedBy("lock")
1293
    Status shutdownStatus;
1294

1295
    void onShutdown(Status reason) {
1296
      boolean shouldShutdownDelayedTransport = false;
1✔
1297
      synchronized (lock) {
1✔
1298
        if (shutdownStatus != null) {
1✔
1299
          return;
1✔
1300
        }
1301
        shutdownStatus = reason;
1✔
1302
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1303
        // retriable streams, which may be in backoff and not using any transport, are already
1304
        // started RPCs.
1305
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1306
          shouldShutdownDelayedTransport = true;
1✔
1307
        }
1308
      }
1✔
1309

1310
      if (shouldShutdownDelayedTransport) {
1✔
1311
        delayedTransport.shutdown(reason);
1✔
1312
      }
1313
    }
1✔
1314

1315
    void onShutdownNow(Status reason) {
1316
      onShutdown(reason);
1✔
1317
      Collection<ClientStream> streams;
1318

1319
      synchronized (lock) {
1✔
1320
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1321
      }
1✔
1322

1323
      for (ClientStream stream : streams) {
1✔
1324
        stream.cancel(reason);
1✔
1325
      }
1✔
1326
      delayedTransport.shutdownNow(reason);
1✔
1327
    }
1✔
1328

1329
    /**
1330
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1331
     * shutdown Status.
1332
     */
1333
    @Nullable
1334
    Status add(RetriableStream<?> retriableStream) {
1335
      synchronized (lock) {
1✔
1336
        if (shutdownStatus != null) {
1✔
1337
          return shutdownStatus;
1✔
1338
        }
1339
        uncommittedRetriableStreams.add(retriableStream);
1✔
1340
        return null;
1✔
1341
      }
1342
    }
1343

1344
    void remove(RetriableStream<?> retriableStream) {
1345
      Status shutdownStatusCopy = null;
1✔
1346

1347
      synchronized (lock) {
1✔
1348
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1349
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1350
          shutdownStatusCopy = shutdownStatus;
1✔
1351
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1352
          // hashmap.
1353
          uncommittedRetriableStreams = new HashSet<>();
1✔
1354
        }
1355
      }
1✔
1356

1357
      if (shutdownStatusCopy != null) {
1✔
1358
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1359
      }
1360
    }
1✔
1361
  }
1362

1363
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1364
    AutoConfiguredLoadBalancer lb;
1365

1366
    @Override
1367
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1368
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1369
      // No new subchannel should be created after load balancer has been shutdown.
1370
      checkState(!terminating, "Channel is being terminated");
1✔
1371
      return new SubchannelImpl(args);
1✔
1372
    }
1373

1374
    @Override
1375
    public void updateBalancingState(
1376
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1377
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1378
      checkNotNull(newState, "newState");
1✔
1379
      checkNotNull(newPicker, "newPicker");
1✔
1380

1381
      if (LbHelperImpl.this != lbHelper || panicMode) {
1✔
1382
        return;
1✔
1383
      }
1384
      updateSubchannelPicker(newPicker);
1✔
1385
      // It's not appropriate to report SHUTDOWN state from lb.
1386
      // Ignore the case of newState == SHUTDOWN for now.
1387
      if (newState != SHUTDOWN) {
1✔
1388
        channelLogger.log(
1✔
1389
            ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1390
        channelStateManager.gotoState(newState);
1✔
1391
      }
1392
    }
1✔
1393

1394
    @Override
1395
    public void refreshNameResolution() {
1396
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1397
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1398
        @Override
1399
        public void run() {
1400
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1401
        }
1✔
1402
      }
1403

1404
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1405
    }
1✔
1406

1407
    @Override
1408
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1409
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1410
    }
1411

1412
    @Override
1413
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1414
        String authority) {
1415
      // TODO(ejona): can we be even stricter? Like terminating?
1416
      checkState(!terminated, "Channel is terminated");
1✔
1417
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1418
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1419
      InternalLogId subchannelLogId =
1✔
1420
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1421
      ChannelTracer oobChannelTracer =
1✔
1422
          new ChannelTracer(
1423
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1424
              "OobChannel for " + addressGroup);
1425
      final OobChannel oobChannel = new OobChannel(
1✔
1426
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1427
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1428
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1429
          .setDescription("Child OobChannel created")
1✔
1430
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1431
          .setTimestampNanos(oobChannelCreationTime)
1✔
1432
          .setChannelRef(oobChannel)
1✔
1433
          .build());
1✔
1434
      ChannelTracer subchannelTracer =
1✔
1435
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1436
              "Subchannel for " + addressGroup);
1437
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1438
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1439
        @Override
1440
        void onTerminated(InternalSubchannel is) {
1441
          oobChannels.remove(oobChannel);
1✔
1442
          channelz.removeSubchannel(is);
1✔
1443
          oobChannel.handleSubchannelTerminated();
1✔
1444
          maybeTerminateChannel();
1✔
1445
        }
1✔
1446

1447
        @Override
1448
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1449
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1450
          //  state and refresh name resolution if necessary.
1451
          handleInternalSubchannelState(newState);
1✔
1452
          oobChannel.handleSubchannelStateChange(newState);
1✔
1453
        }
1✔
1454
      }
1455

1456
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1457
          CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
1✔
1458
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1459
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1460
          // All callback methods are run from syncContext
1461
          new ManagedOobChannelCallback(),
1462
          channelz,
1✔
1463
          callTracerFactory.create(),
1✔
1464
          subchannelTracer,
1465
          subchannelLogId,
1466
          subchannelLogger,
1467
          transportFilters,
1✔
1468
          target,
1✔
1469
          lbHelper.getMetricRecorder());
1✔
1470
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1471
          .setDescription("Child Subchannel created")
1✔
1472
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1473
          .setTimestampNanos(oobChannelCreationTime)
1✔
1474
          .setSubchannelRef(internalSubchannel)
1✔
1475
          .build());
1✔
1476
      channelz.addSubchannel(oobChannel);
1✔
1477
      channelz.addSubchannel(internalSubchannel);
1✔
1478
      oobChannel.setSubchannel(internalSubchannel);
1✔
1479
      final class AddOobChannel implements Runnable {
1✔
1480
        @Override
1481
        public void run() {
1482
          if (terminating) {
1✔
1483
            oobChannel.shutdown();
×
1484
          }
1485
          if (!terminated) {
1✔
1486
            // If channel has not terminated, it will track the subchannel and block termination
1487
            // for it.
1488
            oobChannels.add(oobChannel);
1✔
1489
          }
1490
        }
1✔
1491
      }
1492

1493
      syncContext.execute(new AddOobChannel());
1✔
1494
      return oobChannel;
1✔
1495
    }
1496

1497
    @Deprecated
1498
    @Override
1499
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1500
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1501
          // Override authority to keep the old behavior.
1502
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1503
          .overrideAuthority(getAuthority());
1✔
1504
    }
1505

1506
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1507
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1508
    @Override
1509
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1510
        final String target, final ChannelCredentials channelCreds) {
1511
      checkNotNull(channelCreds, "channelCreds");
1✔
1512

1513
      final class ResolvingOobChannelBuilder
1514
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1515
        final ManagedChannelBuilder<?> delegate;
1516

1517
        ResolvingOobChannelBuilder() {
1✔
1518
          final ClientTransportFactory transportFactory;
1519
          CallCredentials callCredentials;
1520
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1521
            transportFactory = originalTransportFactory;
1✔
1522
            callCredentials = null;
1✔
1523
          } else {
1524
            SwapChannelCredentialsResult swapResult =
1✔
1525
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1526
            if (swapResult == null) {
1✔
1527
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1528
              return;
×
1529
            } else {
1530
              transportFactory = swapResult.transportFactory;
1✔
1531
              callCredentials = swapResult.callCredentials;
1✔
1532
            }
1533
          }
1534
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1535
              new ClientTransportFactoryBuilder() {
1✔
1536
                @Override
1537
                public ClientTransportFactory buildClientTransportFactory() {
1538
                  return transportFactory;
1✔
1539
                }
1540
              };
1541
          delegate = new ManagedChannelImplBuilder(
1✔
1542
              target,
1543
              channelCreds,
1544
              callCredentials,
1545
              transportFactoryBuilder,
1546
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1547
              .nameResolverRegistry(nameResolverRegistry);
1✔
1548
        }
1✔
1549

1550
        @Override
1551
        protected ManagedChannelBuilder<?> delegate() {
1552
          return delegate;
1✔
1553
        }
1554
      }
1555

1556
      checkState(!terminated, "Channel is terminated");
1✔
1557

1558
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1559

1560
      return builder
1✔
1561
          // TODO(zdapeng): executors should not outlive the parent channel.
1562
          .executor(executor)
1✔
1563
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1564
          .maxTraceEvents(maxTraceEvents)
1✔
1565
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1566
          .userAgent(userAgent);
1✔
1567
    }
1568

1569
    @Override
1570
    public ChannelCredentials getUnsafeChannelCredentials() {
1571
      if (originalChannelCreds == null) {
1✔
1572
        return new DefaultChannelCreds();
1✔
1573
      }
1574
      return originalChannelCreds;
×
1575
    }
1576

1577
    @Override
1578
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1579
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1580
    }
×
1581

1582
    @Override
1583
    public void updateOobChannelAddresses(ManagedChannel channel,
1584
        List<EquivalentAddressGroup> eag) {
1585
      checkArgument(channel instanceof OobChannel,
1✔
1586
          "channel must have been returned from createOobChannel");
1587
      ((OobChannel) channel).updateAddresses(eag);
1✔
1588
    }
1✔
1589

1590
    @Override
1591
    public String getAuthority() {
1592
      return ManagedChannelImpl.this.authority();
1✔
1593
    }
1594

1595
    @Override
1596
    public String getChannelTarget() {
1597
      return targetUri.toString();
1✔
1598
    }
1599

1600
    @Override
1601
    public SynchronizationContext getSynchronizationContext() {
1602
      return syncContext;
1✔
1603
    }
1604

1605
    @Override
1606
    public ScheduledExecutorService getScheduledExecutorService() {
1607
      return scheduledExecutor;
1✔
1608
    }
1609

1610
    @Override
1611
    public ChannelLogger getChannelLogger() {
1612
      return channelLogger;
1✔
1613
    }
1614

1615
    @Override
1616
    public NameResolver.Args getNameResolverArgs() {
1617
      return nameResolverArgs;
1✔
1618
    }
1619

1620
    @Override
1621
    public NameResolverRegistry getNameResolverRegistry() {
1622
      return nameResolverRegistry;
1✔
1623
    }
1624

1625
    @Override
1626
    public MetricRecorder getMetricRecorder() {
1627
      return metricRecorder;
1✔
1628
    }
1629

1630
    /**
1631
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1632
     */
1633
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1634
    //     channel creds.
1635
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1636
      @Override
1637
      public ChannelCredentials withoutBearerTokens() {
1638
        return this;
×
1639
      }
1640
    }
1641
  }
1642

1643
  final class NameResolverListener extends NameResolver.Listener2 {
1644
    final LbHelperImpl helper;
1645
    final NameResolver resolver;
1646

1647
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1648
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1649
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1650
    }
1✔
1651

1652
    @Override
1653
    public void onResult(final ResolutionResult resolutionResult) {
1654
      syncContext.execute(() -> onResult2(resolutionResult));
×
1655
    }
×
1656

1657
    @SuppressWarnings("ReferenceEquality")
1658
    @Override
1659
    public Status onResult2(final ResolutionResult resolutionResult) {
1660
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1661
      if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1662
        return Status.OK;
1✔
1663
      }
1664

1665
      StatusOr<List<EquivalentAddressGroup>> serversOrError =
1✔
1666
          resolutionResult.getAddressesOrError();
1✔
1667
      if (!serversOrError.hasValue()) {
1✔
1668
        handleErrorInSyncContext(serversOrError.getStatus());
1✔
1669
        return serversOrError.getStatus();
1✔
1670
      }
1671
      List<EquivalentAddressGroup> servers = serversOrError.getValue();
1✔
1672
      channelLogger.log(
1✔
1673
          ChannelLogLevel.DEBUG,
1674
          "Resolved address: {0}, config={1}",
1675
          servers,
1676
          resolutionResult.getAttributes());
1✔
1677

1678
      if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1679
        channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}",
1✔
1680
            servers);
1681
        lastResolutionState = ResolutionState.SUCCESS;
1✔
1682
      }
1683
      ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1684
      InternalConfigSelector resolvedConfigSelector =
1✔
1685
          resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1686
      ManagedChannelServiceConfig validServiceConfig =
1687
          configOrError != null && configOrError.getConfig() != null
1✔
1688
              ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1689
              : null;
1✔
1690
      Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1691

1692
      ManagedChannelServiceConfig effectiveServiceConfig;
1693
      if (!lookUpServiceConfig) {
1✔
1694
        if (validServiceConfig != null) {
1✔
1695
          channelLogger.log(
1✔
1696
              ChannelLogLevel.INFO,
1697
              "Service config from name resolver discarded by channel settings");
1698
        }
1699
        effectiveServiceConfig =
1700
            defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1701
        if (resolvedConfigSelector != null) {
1✔
1702
          channelLogger.log(
1✔
1703
              ChannelLogLevel.INFO,
1704
              "Config selector from name resolver discarded by channel settings");
1705
        }
1706
        realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1707
      } else {
1708
        // Try to use config if returned from name resolver
1709
        // Otherwise, try to use the default config if available
1710
        if (validServiceConfig != null) {
1✔
1711
          effectiveServiceConfig = validServiceConfig;
1✔
1712
          if (resolvedConfigSelector != null) {
1✔
1713
            realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1714
            if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1715
              channelLogger.log(
×
1716
                  ChannelLogLevel.DEBUG,
1717
                  "Method configs in service config will be discarded due to presence of"
1718
                      + "config-selector");
1719
            }
1720
          } else {
1721
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1722
          }
1723
        } else if (defaultServiceConfig != null) {
1✔
1724
          effectiveServiceConfig = defaultServiceConfig;
1✔
1725
          realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1726
          channelLogger.log(
1✔
1727
              ChannelLogLevel.INFO,
1728
              "Received no service config, using default service config");
1729
        } else if (serviceConfigError != null) {
1✔
1730
          if (!serviceConfigUpdated) {
1✔
1731
            // First DNS lookup has invalid service config, and cannot fall back to default
1732
            channelLogger.log(
1✔
1733
                ChannelLogLevel.INFO,
1734
                "Fallback to error due to invalid first service config without default config");
1735
            // This error could be an "inappropriate" control plane error that should not bleed
1736
            // through to client code using gRPC. We let them flow through here to the LB as
1737
            // we later check for these error codes when investigating pick results in
1738
            // GrpcUtil.getTransportFromPickResult().
1739
            onError(configOrError.getError());
1✔
1740
            return configOrError.getError();
1✔
1741
          } else {
1742
            effectiveServiceConfig = lastServiceConfig;
1✔
1743
          }
1744
        } else {
1745
          effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1746
          realChannel.updateConfigSelector(null);
1✔
1747
        }
1748
        if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1749
          channelLogger.log(
1✔
1750
              ChannelLogLevel.INFO,
1751
              "Service config changed{0}",
1752
              effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1753
          lastServiceConfig = effectiveServiceConfig;
1✔
1754
          transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1755
        }
1756

1757
        try {
1758
          // TODO(creamsoup): when `serversOrError` is empty and lastResolutionStateCopy == SUCCESS
1759
          //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1760
          //  lbNeedAddress is not deterministic
1761
          serviceConfigUpdated = true;
1✔
1762
        } catch (RuntimeException re) {
×
1763
          logger.log(
×
1764
              Level.WARNING,
1765
              "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1766
              re);
1767
        }
1✔
1768
      }
1769

1770
      Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1771
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1772
      if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1773
        Attributes.Builder attrBuilder =
1✔
1774
            effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1775
        Map<String, ?> healthCheckingConfig =
1✔
1776
            effectiveServiceConfig.getHealthCheckingConfig();
1✔
1777
        if (healthCheckingConfig != null) {
1✔
1778
          attrBuilder
1✔
1779
              .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1780
              .build();
1✔
1781
        }
1782
        Attributes attributes = attrBuilder.build();
1✔
1783

1784
        ResolvedAddresses.Builder resolvedAddresses = ResolvedAddresses.newBuilder()
1✔
1785
            .setAddresses(serversOrError.getValue())
1✔
1786
            .setAttributes(attributes)
1✔
1787
            .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig());
1✔
1788
        Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1789
            resolvedAddresses.build());
1✔
1790
        return addressAcceptanceStatus;
1✔
1791
      }
1792
      return Status.OK;
×
1793
    }
1794

1795
    @Override
1796
    public void onError(final Status error) {
1797
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1798
      final class NameResolverErrorHandler implements Runnable {
1✔
1799
        @Override
1800
        public void run() {
1801
          handleErrorInSyncContext(error);
1✔
1802
        }
1✔
1803
      }
1804

1805
      syncContext.execute(new NameResolverErrorHandler());
1✔
1806
    }
1✔
1807

1808
    private void handleErrorInSyncContext(Status error) {
1809
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1810
          new Object[] {getLogId(), error});
1✔
1811
      realChannel.onConfigError();
1✔
1812
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1813
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1814
        lastResolutionState = ResolutionState.ERROR;
1✔
1815
      }
1816
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1817
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1818
        return;
1✔
1819
      }
1820

1821
      helper.lb.handleNameResolutionError(error);
1✔
1822
    }
1✔
1823
  }
1824

1825
  private final class SubchannelImpl extends AbstractSubchannel {
1826
    final CreateSubchannelArgs args;
1827
    final InternalLogId subchannelLogId;
1828
    final ChannelLoggerImpl subchannelLogger;
1829
    final ChannelTracer subchannelTracer;
1830
    List<EquivalentAddressGroup> addressGroups;
1831
    InternalSubchannel subchannel;
1832
    boolean started;
1833
    boolean shutdown;
1834
    ScheduledHandle delayedShutdownTask;
1835

1836
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1837
      checkNotNull(args, "args");
1✔
1838
      addressGroups = args.getAddresses();
1✔
1839
      if (authorityOverride != null) {
1✔
1840
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1841
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1842
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1843
      }
1844
      this.args = args;
1✔
1845
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1846
      subchannelTracer = new ChannelTracer(
1✔
1847
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1848
          "Subchannel for " + args.getAddresses());
1✔
1849
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1850
    }
1✔
1851

1852
    @Override
1853
    public void start(final SubchannelStateListener listener) {
1854
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1855
      checkState(!started, "already started");
1✔
1856
      checkState(!shutdown, "already shutdown");
1✔
1857
      checkState(!terminating, "Channel is being terminated");
1✔
1858
      started = true;
1✔
1859
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1860
        // All callbacks are run in syncContext
1861
        @Override
1862
        void onTerminated(InternalSubchannel is) {
1863
          subchannels.remove(is);
1✔
1864
          channelz.removeSubchannel(is);
1✔
1865
          maybeTerminateChannel();
1✔
1866
        }
1✔
1867

1868
        @Override
1869
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1870
          checkState(listener != null, "listener is null");
1✔
1871
          listener.onSubchannelState(newState);
1✔
1872
        }
1✔
1873

1874
        @Override
1875
        void onInUse(InternalSubchannel is) {
1876
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1877
        }
1✔
1878

1879
        @Override
1880
        void onNotInUse(InternalSubchannel is) {
1881
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1882
        }
1✔
1883
      }
1884

1885
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1886
          args,
1887
          authority(),
1✔
1888
          userAgent,
1✔
1889
          backoffPolicyProvider,
1✔
1890
          transportFactory,
1✔
1891
          transportFactory.getScheduledExecutorService(),
1✔
1892
          stopwatchSupplier,
1✔
1893
          syncContext,
1894
          new ManagedInternalSubchannelCallback(),
1895
          channelz,
1✔
1896
          callTracerFactory.create(),
1✔
1897
          subchannelTracer,
1898
          subchannelLogId,
1899
          subchannelLogger,
1900
          transportFilters, target,
1✔
1901
          lbHelper.getMetricRecorder());
1✔
1902

1903
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1904
          .setDescription("Child Subchannel started")
1✔
1905
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1906
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1907
          .setSubchannelRef(internalSubchannel)
1✔
1908
          .build());
1✔
1909

1910
      this.subchannel = internalSubchannel;
1✔
1911
      channelz.addSubchannel(internalSubchannel);
1✔
1912
      subchannels.add(internalSubchannel);
1✔
1913
    }
1✔
1914

1915
    @Override
1916
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1917
      checkState(started, "not started");
1✔
1918
      return subchannel;
1✔
1919
    }
1920

1921
    @Override
1922
    public void shutdown() {
1923
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1924
      if (subchannel == null) {
1✔
1925
        // start() was not successful
1926
        shutdown = true;
×
1927
        return;
×
1928
      }
1929
      if (shutdown) {
1✔
1930
        if (terminating && delayedShutdownTask != null) {
1✔
1931
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1932
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1933
          delayedShutdownTask.cancel();
×
1934
          delayedShutdownTask = null;
×
1935
          // Will fall through to the subchannel.shutdown() at the end.
1936
        } else {
1937
          return;
1✔
1938
        }
1939
      } else {
1940
        shutdown = true;
1✔
1941
      }
1942
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1943
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1944
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1945
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1946
      // shutdown of Subchannel for a few seconds here.
1947
      //
1948
      // TODO(zhangkun83): consider a better approach
1949
      // (https://github.com/grpc/grpc-java/issues/2562).
1950
      if (!terminating) {
1✔
1951
        final class ShutdownSubchannel implements Runnable {
1✔
1952
          @Override
1953
          public void run() {
1954
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
1955
          }
1✔
1956
        }
1957

1958
        delayedShutdownTask = syncContext.schedule(
1✔
1959
            new LogExceptionRunnable(new ShutdownSubchannel()),
1960
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
1961
            transportFactory.getScheduledExecutorService());
1✔
1962
        return;
1✔
1963
      }
1964
      // When terminating == true, no more real streams will be created. It's safe and also
1965
      // desirable to shutdown timely.
1966
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
1967
    }
1✔
1968

1969
    @Override
1970
    public void requestConnection() {
1971
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1972
      checkState(started, "not started");
1✔
1973
      if (shutdown) {
1✔
1974
        return;
1✔
1975
      }
1976
      subchannel.obtainActiveTransport();
1✔
1977
    }
1✔
1978

1979
    @Override
1980
    public List<EquivalentAddressGroup> getAllAddresses() {
1981
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1982
      checkState(started, "not started");
1✔
1983
      return addressGroups;
1✔
1984
    }
1985

1986
    @Override
1987
    public Attributes getAttributes() {
1988
      return args.getAttributes();
1✔
1989
    }
1990

1991
    @Override
1992
    public String toString() {
1993
      return subchannelLogId.toString();
1✔
1994
    }
1995

1996
    @Override
1997
    public Channel asChannel() {
1998
      checkState(started, "not started");
1✔
1999
      return new SubchannelChannel(
1✔
2000
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2001
          transportFactory.getScheduledExecutorService(),
1✔
2002
          callTracerFactory.create(),
1✔
2003
          new AtomicReference<InternalConfigSelector>(null));
2004
    }
2005

2006
    @Override
2007
    public Object getInternalSubchannel() {
2008
      checkState(started, "Subchannel is not started");
1✔
2009
      return subchannel;
1✔
2010
    }
2011

2012
    @Override
2013
    public ChannelLogger getChannelLogger() {
2014
      return subchannelLogger;
1✔
2015
    }
2016

2017
    @Override
2018
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2019
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2020
      addressGroups = addrs;
1✔
2021
      if (authorityOverride != null) {
1✔
2022
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2023
      }
2024
      subchannel.updateAddresses(addrs);
1✔
2025
    }
1✔
2026

2027
    @Override
2028
    public Attributes getConnectedAddressAttributes() {
2029
      return subchannel.getConnectedAddressAttributes();
1✔
2030
    }
2031

2032
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2033
        List<EquivalentAddressGroup> eags) {
2034
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2035
      for (EquivalentAddressGroup eag : eags) {
1✔
2036
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2037
            eag.getAddresses(),
1✔
2038
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2039
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2040
      }
1✔
2041
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2042
    }
2043
  }
2044

2045
  @Override
2046
  public String toString() {
2047
    return MoreObjects.toStringHelper(this)
1✔
2048
        .add("logId", logId.getId())
1✔
2049
        .add("target", target)
1✔
2050
        .toString();
1✔
2051
  }
2052

2053
  /**
2054
   * Called from syncContext.
2055
   */
2056
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2057
    @Override
2058
    public void transportShutdown(Status s) {
2059
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2060
    }
1✔
2061

2062
    @Override
2063
    public void transportReady() {
2064
      // Don't care
2065
    }
×
2066

2067
    @Override
2068
    public Attributes filterTransport(Attributes attributes) {
2069
      return attributes;
×
2070
    }
2071

2072
    @Override
2073
    public void transportInUse(final boolean inUse) {
2074
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2075
      if (inUse) {
1✔
2076
        // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
2077
        // subchannels is in use. But we should never be in idle mode when delayed transport is in
2078
        // use.
2079
        exitIdleMode();
1✔
2080
      }
2081
    }
1✔
2082

2083
    @Override
2084
    public void transportTerminated() {
2085
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2086
      terminating = true;
1✔
2087
      shutdownNameResolverAndLoadBalancer(false);
1✔
2088
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2089
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2090
      // here.
2091
      maybeShutdownNowSubchannels();
1✔
2092
      maybeTerminateChannel();
1✔
2093
    }
1✔
2094
  }
2095

2096
  /**
2097
   * Must be accessed from syncContext.
2098
   */
2099
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2100
    @Override
2101
    protected void handleInUse() {
2102
      exitIdleMode();
1✔
2103
    }
1✔
2104

2105
    @Override
2106
    protected void handleNotInUse() {
2107
      if (shutdown.get()) {
1✔
2108
        return;
1✔
2109
      }
2110
      rescheduleIdleTimer();
1✔
2111
    }
1✔
2112
  }
2113

2114
  /**
2115
   * Lazily request for Executor from an executor pool.
2116
   * Also act as an Executor directly to simply run a cmd
2117
   */
2118
  @VisibleForTesting
2119
  static final class ExecutorHolder implements Executor {
2120
    private final ObjectPool<? extends Executor> pool;
2121
    private Executor executor;
2122

2123
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2124
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2125
    }
1✔
2126

2127
    synchronized Executor getExecutor() {
2128
      if (executor == null) {
1✔
2129
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2130
      }
2131
      return executor;
1✔
2132
    }
2133

2134
    synchronized void release() {
2135
      if (executor != null) {
1✔
2136
        executor = pool.returnObject(executor);
1✔
2137
      }
2138
    }
1✔
2139

2140
    @Override
2141
    public void execute(Runnable command) {
2142
      getExecutor().execute(command);
1✔
2143
    }
1✔
2144
  }
2145

2146
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2147
    final ScheduledExecutorService delegate;
2148

2149
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2150
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2151
    }
1✔
2152

2153
    @Override
2154
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2155
      return delegate.schedule(callable, delay, unit);
×
2156
    }
2157

2158
    @Override
2159
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2160
      return delegate.schedule(cmd, delay, unit);
1✔
2161
    }
2162

2163
    @Override
2164
    public ScheduledFuture<?> scheduleAtFixedRate(
2165
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2166
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
1✔
2167
    }
2168

2169
    @Override
2170
    public ScheduledFuture<?> scheduleWithFixedDelay(
2171
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2172
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2173
    }
2174

2175
    @Override
2176
    public boolean awaitTermination(long timeout, TimeUnit unit)
2177
        throws InterruptedException {
2178
      return delegate.awaitTermination(timeout, unit);
×
2179
    }
2180

2181
    @Override
2182
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2183
        throws InterruptedException {
2184
      return delegate.invokeAll(tasks);
×
2185
    }
2186

2187
    @Override
2188
    public <T> List<Future<T>> invokeAll(
2189
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2190
        throws InterruptedException {
2191
      return delegate.invokeAll(tasks, timeout, unit);
×
2192
    }
2193

2194
    @Override
2195
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2196
        throws InterruptedException, ExecutionException {
2197
      return delegate.invokeAny(tasks);
×
2198
    }
2199

2200
    @Override
2201
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2202
        throws InterruptedException, ExecutionException, TimeoutException {
2203
      return delegate.invokeAny(tasks, timeout, unit);
×
2204
    }
2205

2206
    @Override
2207
    public boolean isShutdown() {
2208
      return delegate.isShutdown();
×
2209
    }
2210

2211
    @Override
2212
    public boolean isTerminated() {
2213
      return delegate.isTerminated();
×
2214
    }
2215

2216
    @Override
2217
    public void shutdown() {
2218
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2219
    }
2220

2221
    @Override
2222
    public List<Runnable> shutdownNow() {
2223
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2224
    }
2225

2226
    @Override
2227
    public <T> Future<T> submit(Callable<T> task) {
2228
      return delegate.submit(task);
×
2229
    }
2230

2231
    @Override
2232
    public Future<?> submit(Runnable task) {
2233
      return delegate.submit(task);
×
2234
    }
2235

2236
    @Override
2237
    public <T> Future<T> submit(Runnable task, T result) {
2238
      return delegate.submit(task, result);
×
2239
    }
2240

2241
    @Override
2242
    public void execute(Runnable command) {
2243
      delegate.execute(command);
×
2244
    }
×
2245
  }
2246

2247
  /**
2248
   * A ResolutionState indicates the status of last name resolution.
2249
   */
2250
  enum ResolutionState {
1✔
2251
    NO_RESOLUTION,
1✔
2252
    SUCCESS,
1✔
2253
    ERROR
1✔
2254
  }
2255
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc