• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19390

02 Aug 2024 03:10PM UTC coverage: 84.454% (-0.02%) from 84.476%
#19390

push

github

web-flow
Introduce onResult2 in NameResolver Listener2 that returns Status

Lets the Name Resolver receive the status of the acceptance of the name resolution by the load balancer.

33268 of 39392 relevant lines covered (84.45%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.67
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import io.grpc.Attributes;
36
import io.grpc.CallCredentials;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ChannelCredentials;
40
import io.grpc.ChannelLogger;
41
import io.grpc.ChannelLogger.ChannelLogLevel;
42
import io.grpc.ClientCall;
43
import io.grpc.ClientInterceptor;
44
import io.grpc.ClientInterceptors;
45
import io.grpc.ClientStreamTracer;
46
import io.grpc.ClientTransportFilter;
47
import io.grpc.CompressorRegistry;
48
import io.grpc.ConnectivityState;
49
import io.grpc.ConnectivityStateInfo;
50
import io.grpc.Context;
51
import io.grpc.Deadline;
52
import io.grpc.DecompressorRegistry;
53
import io.grpc.EquivalentAddressGroup;
54
import io.grpc.ForwardingChannelBuilder2;
55
import io.grpc.ForwardingClientCall;
56
import io.grpc.Grpc;
57
import io.grpc.InternalChannelz;
58
import io.grpc.InternalChannelz.ChannelStats;
59
import io.grpc.InternalChannelz.ChannelTrace;
60
import io.grpc.InternalConfigSelector;
61
import io.grpc.InternalInstrumented;
62
import io.grpc.InternalLogId;
63
import io.grpc.InternalWithLogId;
64
import io.grpc.LoadBalancer;
65
import io.grpc.LoadBalancer.CreateSubchannelArgs;
66
import io.grpc.LoadBalancer.PickResult;
67
import io.grpc.LoadBalancer.PickSubchannelArgs;
68
import io.grpc.LoadBalancer.ResolvedAddresses;
69
import io.grpc.LoadBalancer.SubchannelPicker;
70
import io.grpc.LoadBalancer.SubchannelStateListener;
71
import io.grpc.ManagedChannel;
72
import io.grpc.ManagedChannelBuilder;
73
import io.grpc.Metadata;
74
import io.grpc.MethodDescriptor;
75
import io.grpc.MetricInstrumentRegistry;
76
import io.grpc.MetricRecorder;
77
import io.grpc.NameResolver;
78
import io.grpc.NameResolver.ConfigOrError;
79
import io.grpc.NameResolver.ResolutionResult;
80
import io.grpc.NameResolverProvider;
81
import io.grpc.NameResolverRegistry;
82
import io.grpc.ProxyDetector;
83
import io.grpc.Status;
84
import io.grpc.SynchronizationContext;
85
import io.grpc.SynchronizationContext.ScheduledHandle;
86
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
87
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
88
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
89
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
90
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
91
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
92
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
93
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
94
import io.grpc.internal.RetriableStream.Throttle;
95
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
96
import java.net.URI;
97
import java.util.ArrayList;
98
import java.util.Collection;
99
import java.util.Collections;
100
import java.util.HashSet;
101
import java.util.LinkedHashSet;
102
import java.util.List;
103
import java.util.Map;
104
import java.util.Set;
105
import java.util.concurrent.Callable;
106
import java.util.concurrent.CountDownLatch;
107
import java.util.concurrent.ExecutionException;
108
import java.util.concurrent.Executor;
109
import java.util.concurrent.Future;
110
import java.util.concurrent.ScheduledExecutorService;
111
import java.util.concurrent.ScheduledFuture;
112
import java.util.concurrent.TimeUnit;
113
import java.util.concurrent.TimeoutException;
114
import java.util.concurrent.atomic.AtomicBoolean;
115
import java.util.concurrent.atomic.AtomicReference;
116
import java.util.logging.Level;
117
import java.util.logging.Logger;
118
import javax.annotation.Nullable;
119
import javax.annotation.concurrent.GuardedBy;
120
import javax.annotation.concurrent.ThreadSafe;
121

122
/** A communication channel for making outgoing RPCs. */
123
@ThreadSafe
124
final class ManagedChannelImpl extends ManagedChannel implements
125
    InternalInstrumented<ChannelStats> {
126
  @VisibleForTesting
127
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
128

129
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
130

131
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
132

133
  @VisibleForTesting
134
  static final Status SHUTDOWN_NOW_STATUS =
1✔
135
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
136

137
  @VisibleForTesting
138
  static final Status SHUTDOWN_STATUS =
1✔
139
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
140

141
  @VisibleForTesting
142
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
143
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
144

145
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
146
      ManagedChannelServiceConfig.empty();
1✔
147
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
148
      new InternalConfigSelector() {
1✔
149
        @Override
150
        public Result selectConfig(PickSubchannelArgs args) {
151
          throw new IllegalStateException("Resolution is pending");
×
152
        }
153
      };
154
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
155
      new LoadBalancer.PickDetailsConsumer() {};
1✔
156

157
  private final InternalLogId logId;
158
  private final String target;
159
  @Nullable
160
  private final String authorityOverride;
161
  private final NameResolverRegistry nameResolverRegistry;
162
  private final URI targetUri;
163
  private final NameResolverProvider nameResolverProvider;
164
  private final NameResolver.Args nameResolverArgs;
165
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
166
  private final ClientTransportFactory originalTransportFactory;
167
  @Nullable
168
  private final ChannelCredentials originalChannelCreds;
169
  private final ClientTransportFactory transportFactory;
170
  private final ClientTransportFactory oobTransportFactory;
171
  private final RestrictedScheduledExecutor scheduledExecutor;
172
  private final Executor executor;
173
  private final ObjectPool<? extends Executor> executorPool;
174
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
175
  private final ExecutorHolder balancerRpcExecutorHolder;
176
  private final ExecutorHolder offloadExecutorHolder;
177
  private final TimeProvider timeProvider;
178
  private final int maxTraceEvents;
179

180
  @VisibleForTesting
1✔
181
  final SynchronizationContext syncContext = new SynchronizationContext(
182
      new Thread.UncaughtExceptionHandler() {
1✔
183
        @Override
184
        public void uncaughtException(Thread t, Throwable e) {
185
          logger.log(
1✔
186
              Level.SEVERE,
187
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
188
              e);
189
          panic(e);
1✔
190
        }
1✔
191
      });
192

193
  private boolean fullStreamDecompression;
194

195
  private final DecompressorRegistry decompressorRegistry;
196
  private final CompressorRegistry compressorRegistry;
197

198
  private final Supplier<Stopwatch> stopwatchSupplier;
199
  /** The timeout before entering idle mode. */
200
  private final long idleTimeoutMillis;
201

202
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
203
  private final BackoffPolicy.Provider backoffPolicyProvider;
204

205
  /**
206
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
207
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
208
   * {@link RealChannel}.
209
   */
210
  private final Channel interceptorChannel;
211

212
  private final List<ClientTransportFilter> transportFilters;
213
  @Nullable private final String userAgent;
214

215
  // Only null after channel is terminated. Must be assigned from the syncContext.
216
  private NameResolver nameResolver;
217

218
  // Must be accessed from the syncContext.
219
  private boolean nameResolverStarted;
220

221
  // null when channel is in idle mode.  Must be assigned from syncContext.
222
  @Nullable
223
  private LbHelperImpl lbHelper;
224

225
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
226
  // null if channel is in idle mode.
227
  @Nullable
228
  private volatile SubchannelPicker subchannelPicker;
229

230
  // Must be accessed from the syncContext
231
  private boolean panicMode;
232

233
  // Must be mutated from syncContext
234
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
235
  // switch to a ConcurrentHashMap.
236
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
237

238
  // Must be accessed from syncContext
239
  @Nullable
240
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
241
  private final Object pendingCallsInUseObject = new Object();
1✔
242

243
  // Must be mutated from syncContext
244
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
245

246
  // reprocess() must be run from syncContext
247
  private final DelayedClientTransport delayedTransport;
248
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
249
      = new UncommittedRetriableStreamsRegistry();
250

251
  // Shutdown states.
252
  //
253
  // Channel's shutdown process:
254
  // 1. shutdown(): stop accepting new calls from applications
255
  //   1a shutdown <- true
256
  //   1b subchannelPicker <- null
257
  //   1c delayedTransport.shutdown()
258
  // 2. delayedTransport terminated: stop stream-creation functionality
259
  //   2a terminating <- true
260
  //   2b loadBalancer.shutdown()
261
  //     * LoadBalancer will shutdown subchannels and OOB channels
262
  //   2c loadBalancer <- null
263
  //   2d nameResolver.shutdown()
264
  //   2e nameResolver <- null
265
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
266

267
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
268
  // Must only be mutated and read from syncContext
269
  private boolean shutdownNowed;
270
  // Must only be mutated from syncContext
271
  private boolean terminating;
272
  // Must be mutated from syncContext
273
  private volatile boolean terminated;
274
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
275

276
  private final CallTracer.Factory callTracerFactory;
277
  private final CallTracer channelCallTracer;
278
  private final ChannelTracer channelTracer;
279
  private final ChannelLogger channelLogger;
280
  private final InternalChannelz channelz;
281
  private final RealChannel realChannel;
282
  // Must be mutated and read from syncContext
283
  // a flag for doing channel tracing when flipped
284
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
285
  // Must be mutated and read from constructor or syncContext
286
  // used for channel tracing when value changed
287
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
288

289
  @Nullable
290
  private final ManagedChannelServiceConfig defaultServiceConfig;
291
  // Must be mutated and read from constructor or syncContext
292
  private boolean serviceConfigUpdated = false;
1✔
293
  private final boolean lookUpServiceConfig;
294

295
  // One instance per channel.
296
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
297

298
  private final long perRpcBufferLimit;
299
  private final long channelBufferLimit;
300

301
  // Temporary false flag that can skip the retry code path.
302
  private final boolean retryEnabled;
303

304
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
305

306
  // Called from syncContext
307
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
308
      new DelayedTransportListener();
309

310
  // Must be called from syncContext
311
  private void maybeShutdownNowSubchannels() {
312
    if (shutdownNowed) {
1✔
313
      for (InternalSubchannel subchannel : subchannels) {
1✔
314
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
315
      }
1✔
316
      for (OobChannel oobChannel : oobChannels) {
1✔
317
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
318
      }
1✔
319
    }
320
  }
1✔
321

322
  // Must be accessed from syncContext
323
  @VisibleForTesting
1✔
324
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
325

326
  @Override
327
  public ListenableFuture<ChannelStats> getStats() {
328
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
329
    final class StatsFetcher implements Runnable {
1✔
330
      @Override
331
      public void run() {
332
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
333
        channelCallTracer.updateBuilder(builder);
1✔
334
        channelTracer.updateBuilder(builder);
1✔
335
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
336
        List<InternalWithLogId> children = new ArrayList<>();
1✔
337
        children.addAll(subchannels);
1✔
338
        children.addAll(oobChannels);
1✔
339
        builder.setSubchannels(children);
1✔
340
        ret.set(builder.build());
1✔
341
      }
1✔
342
    }
343

344
    // subchannels and oobchannels can only be accessed from syncContext
345
    syncContext.execute(new StatsFetcher());
1✔
346
    return ret;
1✔
347
  }
348

349
  @Override
350
  public InternalLogId getLogId() {
351
    return logId;
1✔
352
  }
353

354
  // Run from syncContext
355
  private class IdleModeTimer implements Runnable {
1✔
356

357
    @Override
358
    public void run() {
359
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
360
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
361
      // subtle to change rapidly to resolve the channel panic. See #8714
362
      if (lbHelper == null) {
1✔
363
        return;
×
364
      }
365
      enterIdleMode();
1✔
366
    }
1✔
367
  }
368

369
  // Must be called from syncContext
370
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
371
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
372
    if (channelIsActive) {
1✔
373
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
374
      checkState(lbHelper != null, "lbHelper is null");
1✔
375
    }
376
    if (nameResolver != null) {
1✔
377
      nameResolver.shutdown();
1✔
378
      nameResolverStarted = false;
1✔
379
      if (channelIsActive) {
1✔
380
        nameResolver = getNameResolver(
1✔
381
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
382
      } else {
383
        nameResolver = null;
1✔
384
      }
385
    }
386
    if (lbHelper != null) {
1✔
387
      lbHelper.lb.shutdown();
1✔
388
      lbHelper = null;
1✔
389
    }
390
    subchannelPicker = null;
1✔
391
  }
1✔
392

393
  /**
394
   * Make the channel exit idle mode, if it's in it.
395
   *
396
   * <p>Must be called from syncContext
397
   */
398
  @VisibleForTesting
399
  void exitIdleMode() {
400
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
401
    if (shutdown.get() || panicMode) {
1✔
402
      return;
1✔
403
    }
404
    if (inUseStateAggregator.isInUse()) {
1✔
405
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
406
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
407
      cancelIdleTimer(false);
1✔
408
    } else {
409
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
410
      // isInUse() == false, in which case we still need to schedule the timer.
411
      rescheduleIdleTimer();
1✔
412
    }
413
    if (lbHelper != null) {
1✔
414
      return;
1✔
415
    }
416
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
417
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
418
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
419
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
420
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
421
    this.lbHelper = lbHelper;
1✔
422

423
    channelStateManager.gotoState(CONNECTING);
1✔
424
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
425
    nameResolver.start(listener);
1✔
426
    nameResolverStarted = true;
1✔
427
  }
1✔
428

429
  // Must be run from syncContext
430
  private void enterIdleMode() {
431
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
432
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
433
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
434
    // which are bugs.
435
    shutdownNameResolverAndLoadBalancer(true);
1✔
436
    delayedTransport.reprocess(null);
1✔
437
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
438
    channelStateManager.gotoState(IDLE);
1✔
439
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
440
    // transport to be holding some we need to exit idle mode to give these calls a chance to
441
    // be processed.
442
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
443
      exitIdleMode();
1✔
444
    }
445
  }
1✔
446

447
  // Must be run from syncContext
448
  private void cancelIdleTimer(boolean permanent) {
449
    idleTimer.cancel(permanent);
1✔
450
  }
1✔
451

452
  // Always run from syncContext
453
  private void rescheduleIdleTimer() {
454
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
455
      return;
1✔
456
    }
457
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
458
  }
1✔
459

460
  /**
461
   * Force name resolution refresh to happen immediately. Must be run
462
   * from syncContext.
463
   */
464
  private void refreshNameResolution() {
465
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
466
    if (nameResolverStarted) {
1✔
467
      nameResolver.refresh();
1✔
468
    }
469
  }
1✔
470

471
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
472
    volatile Throttle throttle;
473

474
    @Override
475
    public ClientStream newStream(
476
        final MethodDescriptor<?, ?> method,
477
        final CallOptions callOptions,
478
        final Metadata headers,
479
        final Context context) {
480
      // There is no need to reschedule the idle timer here. If the channel isn't shut down, either
481
      // the delayed transport or a real transport will go in-use and cancel the idle timer.
482
      if (!retryEnabled) {
1✔
483
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
484
            callOptions, headers, 0, /* isTransparentRetry= */ false);
485
        Context origContext = context.attach();
1✔
486
        try {
487
          return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
488
        } finally {
489
          context.detach(origContext);
1✔
490
        }
491
      } else {
492
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
493
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
494
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
495
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
496
          @SuppressWarnings("unchecked")
497
          RetryStream() {
1✔
498
            super(
1✔
499
                (MethodDescriptor<ReqT, ?>) method,
500
                headers,
501
                channelBufferUsed,
1✔
502
                perRpcBufferLimit,
1✔
503
                channelBufferLimit,
1✔
504
                getCallExecutor(callOptions),
1✔
505
                transportFactory.getScheduledExecutorService(),
1✔
506
                retryPolicy,
507
                hedgingPolicy,
508
                throttle);
509
          }
1✔
510

511
          @Override
512
          Status prestart() {
513
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
514
          }
515

516
          @Override
517
          void postCommit() {
518
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
519
          }
1✔
520

521
          @Override
522
          ClientStream newSubstream(
523
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
524
              boolean isTransparentRetry) {
525
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
526
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
527
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
528
            Context origContext = context.attach();
1✔
529
            try {
530
              return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
1✔
531
            } finally {
532
              context.detach(origContext);
1✔
533
            }
534
          }
535
        }
536

537
        return new RetryStream<>();
1✔
538
      }
539
    }
540
  }
541

542
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
543

544
  private final Rescheduler idleTimer;
545
  private final MetricRecorder metricRecorder;
546

547
  ManagedChannelImpl(
548
      ManagedChannelImplBuilder builder,
549
      ClientTransportFactory clientTransportFactory,
550
      URI targetUri,
551
      NameResolverProvider nameResolverProvider,
552
      BackoffPolicy.Provider backoffPolicyProvider,
553
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
554
      Supplier<Stopwatch> stopwatchSupplier,
555
      List<ClientInterceptor> interceptors,
556
      final TimeProvider timeProvider) {
1✔
557
    this.target = checkNotNull(builder.target, "target");
1✔
558
    this.logId = InternalLogId.allocate("Channel", target);
1✔
559
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
560
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
561
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
562
    this.originalChannelCreds = builder.channelCredentials;
1✔
563
    this.originalTransportFactory = clientTransportFactory;
1✔
564
    this.offloadExecutorHolder =
1✔
565
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
566
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
567
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
568
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
569
        clientTransportFactory, null, this.offloadExecutorHolder);
570
    this.scheduledExecutor =
1✔
571
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
572
    maxTraceEvents = builder.maxTraceEvents;
1✔
573
    channelTracer = new ChannelTracer(
1✔
574
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
575
        "Channel for '" + target + "'");
576
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
577
    ProxyDetector proxyDetector =
578
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
579
    this.retryEnabled = builder.retryEnabled;
1✔
580
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
581
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
582
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
583
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
584
    ScParser serviceConfigParser =
1✔
585
        new ScParser(
586
            retryEnabled,
587
            builder.maxRetryAttempts,
588
            builder.maxHedgedAttempts,
589
            loadBalancerFactory);
590
    this.authorityOverride = builder.authorityOverride;
1✔
591
    this.nameResolverArgs =
1✔
592
        NameResolver.Args.newBuilder()
1✔
593
            .setDefaultPort(builder.getDefaultPort())
1✔
594
            .setProxyDetector(proxyDetector)
1✔
595
            .setSynchronizationContext(syncContext)
1✔
596
            .setScheduledExecutorService(scheduledExecutor)
1✔
597
            .setServiceConfigParser(serviceConfigParser)
1✔
598
            .setChannelLogger(channelLogger)
1✔
599
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
600
            .setOverrideAuthority(this.authorityOverride)
1✔
601
            .build();
1✔
602
    this.nameResolver = getNameResolver(
1✔
603
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
604
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
605
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
606
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
607
    this.delayedTransport.start(delayedTransportListener);
1✔
608
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
609

610
    if (builder.defaultServiceConfig != null) {
1✔
611
      ConfigOrError parsedDefaultServiceConfig =
1✔
612
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
613
      checkState(
1✔
614
          parsedDefaultServiceConfig.getError() == null,
1✔
615
          "Default config is invalid: %s",
616
          parsedDefaultServiceConfig.getError());
1✔
617
      this.defaultServiceConfig =
1✔
618
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
619
      this.transportProvider.throttle = this.defaultServiceConfig.getRetryThrottling();
1✔
620
    } else {
1✔
621
      this.defaultServiceConfig = null;
1✔
622
    }
623
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
624
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
625
    Channel channel = realChannel;
1✔
626
    if (builder.binlog != null) {
1✔
627
      channel = builder.binlog.wrapChannel(channel);
1✔
628
    }
629
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
630
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
631
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
632
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
633
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
634
    } else {
635
      checkArgument(
1✔
636
          builder.idleTimeoutMillis
637
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
638
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
639
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
640
    }
641

642
    idleTimer = new Rescheduler(
1✔
643
        new IdleModeTimer(),
644
        syncContext,
645
        transportFactory.getScheduledExecutorService(),
1✔
646
        stopwatchSupplier.get());
1✔
647
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
648
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
649
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
650
    this.userAgent = builder.userAgent;
1✔
651

652
    this.channelBufferLimit = builder.retryBufferSize;
1✔
653
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
654
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
655
      @Override
656
      public CallTracer create() {
657
        return new CallTracer(timeProvider);
1✔
658
      }
659
    }
660

661
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
662
    channelCallTracer = callTracerFactory.create();
1✔
663
    this.channelz = checkNotNull(builder.channelz);
1✔
664
    channelz.addRootChannel(this);
1✔
665

666
    if (!lookUpServiceConfig) {
1✔
667
      if (defaultServiceConfig != null) {
1✔
668
        channelLogger.log(
1✔
669
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
670
      }
671
      serviceConfigUpdated = true;
1✔
672
    }
673
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
674
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
675
  }
1✔
676

677
  @VisibleForTesting
678
  static NameResolver getNameResolver(
679
      URI targetUri, @Nullable final String overrideAuthority,
680
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
681
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
682
    if (resolver == null) {
1✔
683
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
684
    }
685

686
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
687
    // TODO: After a transition period, all NameResolver implementations that need retry should use
688
    //       RetryingNameResolver directly and this step can be removed.
689
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
690
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
691
              nameResolverArgs.getScheduledExecutorService(),
1✔
692
              nameResolverArgs.getSynchronizationContext()),
1✔
693
          nameResolverArgs.getSynchronizationContext());
1✔
694

695
    if (overrideAuthority == null) {
1✔
696
      return usedNameResolver;
1✔
697
    }
698

699
    return new ForwardingNameResolver(usedNameResolver) {
1✔
700
      @Override
701
      public String getServiceAuthority() {
702
        return overrideAuthority;
1✔
703
      }
704
    };
705
  }
706

707
  @VisibleForTesting
708
  InternalConfigSelector getConfigSelector() {
709
    return realChannel.configSelector.get();
1✔
710
  }
711
  
712
  @VisibleForTesting
713
  boolean hasThrottle() {
714
    return this.transportProvider.throttle != null;
1✔
715
  }
716

717
  /**
718
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
719
   * cancelled.
720
   */
721
  @Override
722
  public ManagedChannelImpl shutdown() {
723
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
724
    if (!shutdown.compareAndSet(false, true)) {
1✔
725
      return this;
1✔
726
    }
727
    final class Shutdown implements Runnable {
1✔
728
      @Override
729
      public void run() {
730
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
731
        channelStateManager.gotoState(SHUTDOWN);
1✔
732
      }
1✔
733
    }
734

735
    syncContext.execute(new Shutdown());
1✔
736
    realChannel.shutdown();
1✔
737
    final class CancelIdleTimer implements Runnable {
1✔
738
      @Override
739
      public void run() {
740
        cancelIdleTimer(/* permanent= */ true);
1✔
741
      }
1✔
742
    }
743

744
    syncContext.execute(new CancelIdleTimer());
1✔
745
    return this;
1✔
746
  }
747

748
  /**
749
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
750
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
751
   * return {@code false} immediately after this method returns.
752
   */
753
  @Override
754
  public ManagedChannelImpl shutdownNow() {
755
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
756
    shutdown();
1✔
757
    realChannel.shutdownNow();
1✔
758
    final class ShutdownNow implements Runnable {
1✔
759
      @Override
760
      public void run() {
761
        if (shutdownNowed) {
1✔
762
          return;
1✔
763
        }
764
        shutdownNowed = true;
1✔
765
        maybeShutdownNowSubchannels();
1✔
766
      }
1✔
767
    }
768

769
    syncContext.execute(new ShutdownNow());
1✔
770
    return this;
1✔
771
  }
772

773
  // Called from syncContext
774
  @VisibleForTesting
775
  void panic(final Throwable t) {
776
    if (panicMode) {
1✔
777
      // Preserve the first panic information
778
      return;
×
779
    }
780
    panicMode = true;
1✔
781
    cancelIdleTimer(/* permanent= */ true);
1✔
782
    shutdownNameResolverAndLoadBalancer(false);
1✔
783
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
784
      private final PickResult panicPickResult =
1✔
785
          PickResult.withDrop(
1✔
786
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
787

788
      @Override
789
      public PickResult pickSubchannel(PickSubchannelArgs args) {
790
        return panicPickResult;
1✔
791
      }
792

793
      @Override
794
      public String toString() {
795
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
796
            .add("panicPickResult", panicPickResult)
×
797
            .toString();
×
798
      }
799
    }
800

801
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
802
    realChannel.updateConfigSelector(null);
1✔
803
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
804
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
805
  }
1✔
806

807
  @VisibleForTesting
808
  boolean isInPanicMode() {
809
    return panicMode;
1✔
810
  }
811

812
  // Called from syncContext
813
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
814
    subchannelPicker = newPicker;
1✔
815
    delayedTransport.reprocess(newPicker);
1✔
816
  }
1✔
817

818
  @Override
819
  public boolean isShutdown() {
820
    return shutdown.get();
1✔
821
  }
822

823
  @Override
824
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
825
    return terminatedLatch.await(timeout, unit);
1✔
826
  }
827

828
  @Override
829
  public boolean isTerminated() {
830
    return terminated;
1✔
831
  }
832

833
  /*
834
   * Creates a new outgoing call on the channel.
835
   */
836
  @Override
837
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
838
      CallOptions callOptions) {
839
    return interceptorChannel.newCall(method, callOptions);
1✔
840
  }
841

842
  @Override
843
  public String authority() {
844
    return interceptorChannel.authority();
1✔
845
  }
846

847
  private Executor getCallExecutor(CallOptions callOptions) {
848
    Executor executor = callOptions.getExecutor();
1✔
849
    if (executor == null) {
1✔
850
      executor = this.executor;
1✔
851
    }
852
    return executor;
1✔
853
  }
854

855
  private class RealChannel extends Channel {
856
    // Reference to null if no config selector is available from resolution result
857
    // Reference must be set() from syncContext
858
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
859
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
860
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
861
    // same target, the new instance must have the same value.
862
    private final String authority;
863

864
    private final Channel clientCallImplChannel = new Channel() {
1✔
865
      @Override
866
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
867
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
868
        return new ClientCallImpl<>(
1✔
869
            method,
870
            getCallExecutor(callOptions),
1✔
871
            callOptions,
872
            transportProvider,
1✔
873
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
874
            channelCallTracer,
1✔
875
            null)
876
            .setFullStreamDecompression(fullStreamDecompression)
1✔
877
            .setDecompressorRegistry(decompressorRegistry)
1✔
878
            .setCompressorRegistry(compressorRegistry);
1✔
879
      }
880

881
      @Override
882
      public String authority() {
883
        return authority;
×
884
      }
885
    };
886

887
    private RealChannel(String authority) {
1✔
888
      this.authority =  checkNotNull(authority, "authority");
1✔
889
    }
1✔
890

891
    @Override
892
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
893
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
894
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
895
        return newClientCall(method, callOptions);
1✔
896
      }
897
      syncContext.execute(new Runnable() {
1✔
898
        @Override
899
        public void run() {
900
          exitIdleMode();
1✔
901
        }
1✔
902
      });
903
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
904
        // This is an optimization for the case (typically with InProcessTransport) when name
905
        // resolution result is immediately available at this point. Otherwise, some users'
906
        // tests might observe slight behavior difference from earlier grpc versions.
907
        return newClientCall(method, callOptions);
1✔
908
      }
909
      if (shutdown.get()) {
1✔
910
        // Return a failing ClientCall.
911
        return new ClientCall<ReqT, RespT>() {
×
912
          @Override
913
          public void start(Listener<RespT> responseListener, Metadata headers) {
914
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
915
          }
×
916

917
          @Override public void request(int numMessages) {}
×
918

919
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
920

921
          @Override public void halfClose() {}
×
922

923
          @Override public void sendMessage(ReqT message) {}
×
924
        };
925
      }
926
      Context context = Context.current();
1✔
927
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
928
      syncContext.execute(new Runnable() {
1✔
929
        @Override
930
        public void run() {
931
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
932
            if (pendingCalls == null) {
1✔
933
              pendingCalls = new LinkedHashSet<>();
1✔
934
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
935
            }
936
            pendingCalls.add(pendingCall);
1✔
937
          } else {
938
            pendingCall.reprocess();
1✔
939
          }
940
        }
1✔
941
      });
942
      return pendingCall;
1✔
943
    }
944

945
    // Must run in SynchronizationContext.
946
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
947
      InternalConfigSelector prevConfig = configSelector.get();
1✔
948
      configSelector.set(config);
1✔
949
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
950
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
951
          pendingCall.reprocess();
1✔
952
        }
1✔
953
      }
954
    }
1✔
955

956
    // Must run in SynchronizationContext.
957
    void onConfigError() {
958
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
959
        updateConfigSelector(null);
1✔
960
      }
961
    }
1✔
962

963
    void shutdown() {
964
      final class RealChannelShutdown implements Runnable {
1✔
965
        @Override
966
        public void run() {
967
          if (pendingCalls == null) {
1✔
968
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
969
              configSelector.set(null);
1✔
970
            }
971
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
972
          }
973
        }
1✔
974
      }
975

976
      syncContext.execute(new RealChannelShutdown());
1✔
977
    }
1✔
978

979
    void shutdownNow() {
980
      final class RealChannelShutdownNow implements Runnable {
1✔
981
        @Override
982
        public void run() {
983
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
984
            configSelector.set(null);
1✔
985
          }
986
          if (pendingCalls != null) {
1✔
987
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
988
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
989
            }
1✔
990
          }
991
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
992
        }
1✔
993
      }
994

995
      syncContext.execute(new RealChannelShutdownNow());
1✔
996
    }
1✔
997

998
    @Override
999
    public String authority() {
1000
      return authority;
1✔
1001
    }
1002

1003
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1004
      final Context context;
1005
      final MethodDescriptor<ReqT, RespT> method;
1006
      final CallOptions callOptions;
1007
      private final long callCreationTime;
1008

1009
      PendingCall(
1010
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1011
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1012
        this.context = context;
1✔
1013
        this.method = method;
1✔
1014
        this.callOptions = callOptions;
1✔
1015
        this.callCreationTime = ticker.nanoTime();
1✔
1016
      }
1✔
1017

1018
      /** Called when it's ready to create a real call and reprocess the pending call. */
1019
      void reprocess() {
1020
        ClientCall<ReqT, RespT> realCall;
1021
        Context previous = context.attach();
1✔
1022
        try {
1023
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1024
              ticker.nanoTime() - callCreationTime);
1✔
1025
          realCall = newClientCall(method, delayResolutionOption);
1✔
1026
        } finally {
1027
          context.detach(previous);
1✔
1028
        }
1029
        Runnable toRun = setCall(realCall);
1✔
1030
        if (toRun == null) {
1✔
1031
          syncContext.execute(new PendingCallRemoval());
1✔
1032
        } else {
1033
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1034
            @Override
1035
            public void run() {
1036
              toRun.run();
1✔
1037
              syncContext.execute(new PendingCallRemoval());
1✔
1038
            }
1✔
1039
          });
1040
        }
1041
      }
1✔
1042

1043
      @Override
1044
      protected void callCancelled() {
1045
        super.callCancelled();
1✔
1046
        syncContext.execute(new PendingCallRemoval());
1✔
1047
      }
1✔
1048

1049
      final class PendingCallRemoval implements Runnable {
1✔
1050
        @Override
1051
        public void run() {
1052
          if (pendingCalls != null) {
1✔
1053
            pendingCalls.remove(PendingCall.this);
1✔
1054
            if (pendingCalls.isEmpty()) {
1✔
1055
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1056
              pendingCalls = null;
1✔
1057
              if (shutdown.get()) {
1✔
1058
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1059
              }
1060
            }
1061
          }
1062
        }
1✔
1063
      }
1064
    }
1065

1066
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1067
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1068
      InternalConfigSelector selector = configSelector.get();
1✔
1069
      if (selector == null) {
1✔
1070
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1071
      }
1072
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1073
        MethodInfo methodInfo =
1✔
1074
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1075
        if (methodInfo != null) {
1✔
1076
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1077
        }
1078
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1079
      }
1080
      return new ConfigSelectingClientCall<>(
1✔
1081
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1082
    }
1083
  }
1084

1085
  /**
1086
   * A client call for a given channel that applies a given config selector when it starts.
1087
   */
1088
  static final class ConfigSelectingClientCall<ReqT, RespT>
1089
      extends ForwardingClientCall<ReqT, RespT> {
1090

1091
    private final InternalConfigSelector configSelector;
1092
    private final Channel channel;
1093
    private final Executor callExecutor;
1094
    private final MethodDescriptor<ReqT, RespT> method;
1095
    private final Context context;
1096
    private CallOptions callOptions;
1097

1098
    private ClientCall<ReqT, RespT> delegate;
1099

1100
    ConfigSelectingClientCall(
1101
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1102
        MethodDescriptor<ReqT, RespT> method,
1103
        CallOptions callOptions) {
1✔
1104
      this.configSelector = configSelector;
1✔
1105
      this.channel = channel;
1✔
1106
      this.method = method;
1✔
1107
      this.callExecutor =
1✔
1108
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1109
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1110
      this.context = Context.current();
1✔
1111
    }
1✔
1112

1113
    @Override
1114
    protected ClientCall<ReqT, RespT> delegate() {
1115
      return delegate;
1✔
1116
    }
1117

1118
    @SuppressWarnings("unchecked")
1119
    @Override
1120
    public void start(Listener<RespT> observer, Metadata headers) {
1121
      PickSubchannelArgs args =
1✔
1122
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1123
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1124
      Status status = result.getStatus();
1✔
1125
      if (!status.isOk()) {
1✔
1126
        executeCloseObserverInContext(observer,
1✔
1127
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1128
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1129
        return;
1✔
1130
      }
1131
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1132
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1133
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1134
      if (methodInfo != null) {
1✔
1135
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1136
      }
1137
      if (interceptor != null) {
1✔
1138
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1139
      } else {
1140
        delegate = channel.newCall(method, callOptions);
×
1141
      }
1142
      delegate.start(observer, headers);
1✔
1143
    }
1✔
1144

1145
    private void executeCloseObserverInContext(
1146
        final Listener<RespT> observer, final Status status) {
1147
      class CloseInContext extends ContextRunnable {
1148
        CloseInContext() {
1✔
1149
          super(context);
1✔
1150
        }
1✔
1151

1152
        @Override
1153
        public void runInContext() {
1154
          observer.onClose(status, new Metadata());
1✔
1155
        }
1✔
1156
      }
1157

1158
      callExecutor.execute(new CloseInContext());
1✔
1159
    }
1✔
1160

1161
    @Override
1162
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1163
      if (delegate != null) {
×
1164
        delegate.cancel(message, cause);
×
1165
      }
1166
    }
×
1167
  }
1168

1169
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1170
    @Override
1171
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1172

1173
    @Override
1174
    public void request(int numMessages) {}
1✔
1175

1176
    @Override
1177
    public void cancel(String message, Throwable cause) {}
×
1178

1179
    @Override
1180
    public void halfClose() {}
×
1181

1182
    @Override
1183
    public void sendMessage(Object message) {}
×
1184

1185
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1186
    @Override
1187
    public boolean isReady() {
1188
      return false;
×
1189
    }
1190
  };
1191

1192
  /**
1193
   * Terminate the channel if termination conditions are met.
1194
   */
1195
  // Must be run from syncContext
1196
  private void maybeTerminateChannel() {
1197
    if (terminated) {
1✔
1198
      return;
×
1199
    }
1200
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1201
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1202
      channelz.removeRootChannel(this);
1✔
1203
      executorPool.returnObject(executor);
1✔
1204
      balancerRpcExecutorHolder.release();
1✔
1205
      offloadExecutorHolder.release();
1✔
1206
      // Release the transport factory so that it can deallocate any resources.
1207
      transportFactory.close();
1✔
1208

1209
      terminated = true;
1✔
1210
      terminatedLatch.countDown();
1✔
1211
    }
1212
  }
1✔
1213

1214
  // Must be called from syncContext
1215
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1216
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1217
      refreshNameResolution();
1✔
1218
    }
1219
  }
1✔
1220

1221
  @Override
1222
  @SuppressWarnings("deprecation")
1223
  public ConnectivityState getState(boolean requestConnection) {
1224
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1225
    if (requestConnection && savedChannelState == IDLE) {
1✔
1226
      final class RequestConnection implements Runnable {
1✔
1227
        @Override
1228
        public void run() {
1229
          exitIdleMode();
1✔
1230
          if (subchannelPicker != null) {
1✔
1231
            subchannelPicker.requestConnection();
1✔
1232
          }
1233
          if (lbHelper != null) {
1✔
1234
            lbHelper.lb.requestConnection();
1✔
1235
          }
1236
        }
1✔
1237
      }
1238

1239
      syncContext.execute(new RequestConnection());
1✔
1240
    }
1241
    return savedChannelState;
1✔
1242
  }
1243

1244
  @Override
1245
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1246
    final class NotifyStateChanged implements Runnable {
1✔
1247
      @Override
1248
      public void run() {
1249
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1250
      }
1✔
1251
    }
1252

1253
    syncContext.execute(new NotifyStateChanged());
1✔
1254
  }
1✔
1255

1256
  @Override
1257
  public void resetConnectBackoff() {
1258
    final class ResetConnectBackoff implements Runnable {
1✔
1259
      @Override
1260
      public void run() {
1261
        if (shutdown.get()) {
1✔
1262
          return;
1✔
1263
        }
1264
        if (nameResolverStarted) {
1✔
1265
          refreshNameResolution();
1✔
1266
        }
1267
        for (InternalSubchannel subchannel : subchannels) {
1✔
1268
          subchannel.resetConnectBackoff();
1✔
1269
        }
1✔
1270
        for (OobChannel oobChannel : oobChannels) {
1✔
1271
          oobChannel.resetConnectBackoff();
×
1272
        }
×
1273
      }
1✔
1274
    }
1275

1276
    syncContext.execute(new ResetConnectBackoff());
1✔
1277
  }
1✔
1278

1279
  @Override
1280
  public void enterIdle() {
1281
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1282
      @Override
1283
      public void run() {
1284
        if (shutdown.get() || lbHelper == null) {
1✔
1285
          return;
1✔
1286
        }
1287
        cancelIdleTimer(/* permanent= */ false);
1✔
1288
        enterIdleMode();
1✔
1289
      }
1✔
1290
    }
1291

1292
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1293
  }
1✔
1294

1295
  /**
1296
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1297
   * backoff.
1298
   */
1299
  private final class UncommittedRetriableStreamsRegistry {
1✔
1300
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1301
    // it's worthwhile to look for a lock-free approach.
1302
    final Object lock = new Object();
1✔
1303

1304
    @GuardedBy("lock")
1✔
1305
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1306

1307
    @GuardedBy("lock")
1308
    Status shutdownStatus;
1309

1310
    void onShutdown(Status reason) {
1311
      boolean shouldShutdownDelayedTransport = false;
1✔
1312
      synchronized (lock) {
1✔
1313
        if (shutdownStatus != null) {
1✔
1314
          return;
1✔
1315
        }
1316
        shutdownStatus = reason;
1✔
1317
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1318
        // retriable streams, which may be in backoff and not using any transport, are already
1319
        // started RPCs.
1320
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1321
          shouldShutdownDelayedTransport = true;
1✔
1322
        }
1323
      }
1✔
1324

1325
      if (shouldShutdownDelayedTransport) {
1✔
1326
        delayedTransport.shutdown(reason);
1✔
1327
      }
1328
    }
1✔
1329

1330
    void onShutdownNow(Status reason) {
1331
      onShutdown(reason);
1✔
1332
      Collection<ClientStream> streams;
1333

1334
      synchronized (lock) {
1✔
1335
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1336
      }
1✔
1337

1338
      for (ClientStream stream : streams) {
1✔
1339
        stream.cancel(reason);
1✔
1340
      }
1✔
1341
      delayedTransport.shutdownNow(reason);
1✔
1342
    }
1✔
1343

1344
    /**
1345
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1346
     * shutdown Status.
1347
     */
1348
    @Nullable
1349
    Status add(RetriableStream<?> retriableStream) {
1350
      synchronized (lock) {
1✔
1351
        if (shutdownStatus != null) {
1✔
1352
          return shutdownStatus;
1✔
1353
        }
1354
        uncommittedRetriableStreams.add(retriableStream);
1✔
1355
        return null;
1✔
1356
      }
1357
    }
1358

1359
    void remove(RetriableStream<?> retriableStream) {
1360
      Status shutdownStatusCopy = null;
1✔
1361

1362
      synchronized (lock) {
1✔
1363
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1364
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1365
          shutdownStatusCopy = shutdownStatus;
1✔
1366
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1367
          // hashmap.
1368
          uncommittedRetriableStreams = new HashSet<>();
1✔
1369
        }
1370
      }
1✔
1371

1372
      if (shutdownStatusCopy != null) {
1✔
1373
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1374
      }
1375
    }
1✔
1376
  }
1377

1378
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1379
    AutoConfiguredLoadBalancer lb;
1380

1381
    @Override
1382
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1383
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1384
      // No new subchannel should be created after load balancer has been shutdown.
1385
      checkState(!terminating, "Channel is being terminated");
1✔
1386
      return new SubchannelImpl(args);
1✔
1387
    }
1388

1389
    @Override
1390
    public void updateBalancingState(
1391
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1392
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1393
      checkNotNull(newState, "newState");
1✔
1394
      checkNotNull(newPicker, "newPicker");
1✔
1395
      final class UpdateBalancingState implements Runnable {
1✔
1396
        @Override
1397
        public void run() {
1398
          if (LbHelperImpl.this != lbHelper) {
1✔
1399
            return;
1✔
1400
          }
1401
          updateSubchannelPicker(newPicker);
1✔
1402
          // It's not appropriate to report SHUTDOWN state from lb.
1403
          // Ignore the case of newState == SHUTDOWN for now.
1404
          if (newState != SHUTDOWN) {
1✔
1405
            channelLogger.log(
1✔
1406
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1407
            channelStateManager.gotoState(newState);
1✔
1408
          }
1409
        }
1✔
1410
      }
1411

1412
      syncContext.execute(new UpdateBalancingState());
1✔
1413
    }
1✔
1414

1415
    @Override
1416
    public void refreshNameResolution() {
1417
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1418
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1419
        @Override
1420
        public void run() {
1421
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1422
        }
1✔
1423
      }
1424

1425
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1426
    }
1✔
1427

1428
    @Override
1429
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1430
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1431
    }
1432

1433
    @Override
1434
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1435
        String authority) {
1436
      // TODO(ejona): can we be even stricter? Like terminating?
1437
      checkState(!terminated, "Channel is terminated");
1✔
1438
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1439
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1440
      InternalLogId subchannelLogId =
1✔
1441
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1442
      ChannelTracer oobChannelTracer =
1✔
1443
          new ChannelTracer(
1444
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1445
              "OobChannel for " + addressGroup);
1446
      final OobChannel oobChannel = new OobChannel(
1✔
1447
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1448
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1449
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1450
          .setDescription("Child OobChannel created")
1✔
1451
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1452
          .setTimestampNanos(oobChannelCreationTime)
1✔
1453
          .setChannelRef(oobChannel)
1✔
1454
          .build());
1✔
1455
      ChannelTracer subchannelTracer =
1✔
1456
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1457
              "Subchannel for " + addressGroup);
1458
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1459
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1460
        @Override
1461
        void onTerminated(InternalSubchannel is) {
1462
          oobChannels.remove(oobChannel);
1✔
1463
          channelz.removeSubchannel(is);
1✔
1464
          oobChannel.handleSubchannelTerminated();
1✔
1465
          maybeTerminateChannel();
1✔
1466
        }
1✔
1467

1468
        @Override
1469
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1470
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1471
          //  state and refresh name resolution if necessary.
1472
          handleInternalSubchannelState(newState);
1✔
1473
          oobChannel.handleSubchannelStateChange(newState);
1✔
1474
        }
1✔
1475
      }
1476

1477
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1478
          addressGroup,
1479
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1480
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1481
          // All callback methods are run from syncContext
1482
          new ManagedOobChannelCallback(),
1483
          channelz,
1✔
1484
          callTracerFactory.create(),
1✔
1485
          subchannelTracer,
1486
          subchannelLogId,
1487
          subchannelLogger,
1488
          transportFilters);
1✔
1489
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1490
          .setDescription("Child Subchannel created")
1✔
1491
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1492
          .setTimestampNanos(oobChannelCreationTime)
1✔
1493
          .setSubchannelRef(internalSubchannel)
1✔
1494
          .build());
1✔
1495
      channelz.addSubchannel(oobChannel);
1✔
1496
      channelz.addSubchannel(internalSubchannel);
1✔
1497
      oobChannel.setSubchannel(internalSubchannel);
1✔
1498
      final class AddOobChannel implements Runnable {
1✔
1499
        @Override
1500
        public void run() {
1501
          if (terminating) {
1✔
1502
            oobChannel.shutdown();
×
1503
          }
1504
          if (!terminated) {
1✔
1505
            // If channel has not terminated, it will track the subchannel and block termination
1506
            // for it.
1507
            oobChannels.add(oobChannel);
1✔
1508
          }
1509
        }
1✔
1510
      }
1511

1512
      syncContext.execute(new AddOobChannel());
1✔
1513
      return oobChannel;
1✔
1514
    }
1515

1516
    @Deprecated
1517
    @Override
1518
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1519
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1520
          // Override authority to keep the old behavior.
1521
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1522
          .overrideAuthority(getAuthority());
1✔
1523
    }
1524

1525
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1526
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1527
    @Override
1528
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1529
        final String target, final ChannelCredentials channelCreds) {
1530
      checkNotNull(channelCreds, "channelCreds");
1✔
1531

1532
      final class ResolvingOobChannelBuilder
1533
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1534
        final ManagedChannelBuilder<?> delegate;
1535

1536
        ResolvingOobChannelBuilder() {
1✔
1537
          final ClientTransportFactory transportFactory;
1538
          CallCredentials callCredentials;
1539
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1540
            transportFactory = originalTransportFactory;
1✔
1541
            callCredentials = null;
1✔
1542
          } else {
1543
            SwapChannelCredentialsResult swapResult =
1✔
1544
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1545
            if (swapResult == null) {
1✔
1546
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1547
              return;
×
1548
            } else {
1549
              transportFactory = swapResult.transportFactory;
1✔
1550
              callCredentials = swapResult.callCredentials;
1✔
1551
            }
1552
          }
1553
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1554
              new ClientTransportFactoryBuilder() {
1✔
1555
                @Override
1556
                public ClientTransportFactory buildClientTransportFactory() {
1557
                  return transportFactory;
1✔
1558
                }
1559
              };
1560
          delegate = new ManagedChannelImplBuilder(
1✔
1561
              target,
1562
              channelCreds,
1563
              callCredentials,
1564
              transportFactoryBuilder,
1565
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1566
              .nameResolverRegistry(nameResolverRegistry);
1✔
1567
        }
1✔
1568

1569
        @Override
1570
        protected ManagedChannelBuilder<?> delegate() {
1571
          return delegate;
1✔
1572
        }
1573
      }
1574

1575
      checkState(!terminated, "Channel is terminated");
1✔
1576

1577
      @SuppressWarnings("deprecation")
1578
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1579

1580
      return builder
1✔
1581
          // TODO(zdapeng): executors should not outlive the parent channel.
1582
          .executor(executor)
1✔
1583
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1584
          .maxTraceEvents(maxTraceEvents)
1✔
1585
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1586
          .userAgent(userAgent);
1✔
1587
    }
1588

1589
    @Override
1590
    public ChannelCredentials getUnsafeChannelCredentials() {
1591
      if (originalChannelCreds == null) {
1✔
1592
        return new DefaultChannelCreds();
1✔
1593
      }
1594
      return originalChannelCreds;
×
1595
    }
1596

1597
    @Override
1598
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1599
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1600
    }
×
1601

1602
    @Override
1603
    public void updateOobChannelAddresses(ManagedChannel channel,
1604
        List<EquivalentAddressGroup> eag) {
1605
      checkArgument(channel instanceof OobChannel,
1✔
1606
          "channel must have been returned from createOobChannel");
1607
      ((OobChannel) channel).updateAddresses(eag);
1✔
1608
    }
1✔
1609

1610
    @Override
1611
    public String getAuthority() {
1612
      return ManagedChannelImpl.this.authority();
1✔
1613
    }
1614

1615
    @Override
1616
    public String getChannelTarget() {
1617
      return targetUri.toString();
1✔
1618
    }
1619

1620
    @Override
1621
    public SynchronizationContext getSynchronizationContext() {
1622
      return syncContext;
1✔
1623
    }
1624

1625
    @Override
1626
    public ScheduledExecutorService getScheduledExecutorService() {
1627
      return scheduledExecutor;
1✔
1628
    }
1629

1630
    @Override
1631
    public ChannelLogger getChannelLogger() {
1632
      return channelLogger;
1✔
1633
    }
1634

1635
    @Override
1636
    public NameResolver.Args getNameResolverArgs() {
1637
      return nameResolverArgs;
1✔
1638
    }
1639

1640
    @Override
1641
    public NameResolverRegistry getNameResolverRegistry() {
1642
      return nameResolverRegistry;
1✔
1643
    }
1644

1645
    @Override
1646
    public MetricRecorder getMetricRecorder() {
1647
      return metricRecorder;
1✔
1648
    }
1649

1650
    /**
1651
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1652
     */
1653
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1654
    //     channel creds.
1655
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1656
      @Override
1657
      public ChannelCredentials withoutBearerTokens() {
1658
        return this;
×
1659
      }
1660
    }
1661
  }
1662

1663
  final class NameResolverListener extends NameResolver.Listener2 {
1664
    final LbHelperImpl helper;
1665
    final NameResolver resolver;
1666

1667
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1668
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1669
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1670
    }
1✔
1671

1672
    @Override
1673
    public void onResult(final ResolutionResult resolutionResult) {
1674
      final class NamesResolved implements Runnable {
1✔
1675

1676
        @Override
1677
        public void run() {
1678
          Status status = onResult2(resolutionResult);
1✔
1679
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1680
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1681
          resolutionResultListener.resolutionAttempted(status);
1✔
1682
        }
1✔
1683
      }
1684

1685
      syncContext.execute(new NamesResolved());
1✔
1686
    }
1✔
1687

1688
    @SuppressWarnings("ReferenceEquality")
1689
    @Override
1690
    public Status onResult2(final ResolutionResult resolutionResult) {
1691
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1692
      if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1693
        return Status.OK;
1✔
1694
      }
1695

1696
      List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1697
      channelLogger.log(
1✔
1698
          ChannelLogLevel.DEBUG,
1699
          "Resolved address: {0}, config={1}",
1700
          servers,
1701
          resolutionResult.getAttributes());
1✔
1702

1703
      if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1704
        channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1705
        lastResolutionState = ResolutionState.SUCCESS;
1✔
1706
      }
1707

1708
      ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1709
      InternalConfigSelector resolvedConfigSelector =
1✔
1710
          resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1711
      ManagedChannelServiceConfig validServiceConfig =
1712
          configOrError != null && configOrError.getConfig() != null
1✔
1713
              ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1714
              : null;
1✔
1715
      Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1716

1717
      ManagedChannelServiceConfig effectiveServiceConfig;
1718
      if (!lookUpServiceConfig) {
1✔
1719
        if (validServiceConfig != null) {
1✔
1720
          channelLogger.log(
1✔
1721
              ChannelLogLevel.INFO,
1722
              "Service config from name resolver discarded by channel settings");
1723
        }
1724
        effectiveServiceConfig =
1725
            defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1726
        if (resolvedConfigSelector != null) {
1✔
1727
          channelLogger.log(
1✔
1728
              ChannelLogLevel.INFO,
1729
              "Config selector from name resolver discarded by channel settings");
1730
        }
1731
        realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1732
      } else {
1733
        // Try to use config if returned from name resolver
1734
        // Otherwise, try to use the default config if available
1735
        if (validServiceConfig != null) {
1✔
1736
          effectiveServiceConfig = validServiceConfig;
1✔
1737
          if (resolvedConfigSelector != null) {
1✔
1738
            realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1739
            if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1740
              channelLogger.log(
×
1741
                  ChannelLogLevel.DEBUG,
1742
                  "Method configs in service config will be discarded due to presence of"
1743
                      + "config-selector");
1744
            }
1745
          } else {
1746
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1747
          }
1748
        } else if (defaultServiceConfig != null) {
1✔
1749
          effectiveServiceConfig = defaultServiceConfig;
1✔
1750
          realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1751
          channelLogger.log(
1✔
1752
              ChannelLogLevel.INFO,
1753
              "Received no service config, using default service config");
1754
        } else if (serviceConfigError != null) {
1✔
1755
          if (!serviceConfigUpdated) {
1✔
1756
            // First DNS lookup has invalid service config, and cannot fall back to default
1757
            channelLogger.log(
1✔
1758
                ChannelLogLevel.INFO,
1759
                "Fallback to error due to invalid first service config without default config");
1760
            // This error could be an "inappropriate" control plane error that should not bleed
1761
            // through to client code using gRPC. We let them flow through here to the LB as
1762
            // we later check for these error codes when investigating pick results in
1763
            // GrpcUtil.getTransportFromPickResult().
1764
            onError(configOrError.getError());
1✔
1765
            return configOrError.getError();
1✔
1766
          } else {
1767
            effectiveServiceConfig = lastServiceConfig;
1✔
1768
          }
1769
        } else {
1770
          effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1771
          realChannel.updateConfigSelector(null);
1✔
1772
        }
1773
        if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1774
          channelLogger.log(
1✔
1775
              ChannelLogLevel.INFO,
1776
              "Service config changed{0}",
1777
              effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1778
          lastServiceConfig = effectiveServiceConfig;
1✔
1779
          transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1780
        }
1781

1782
        try {
1783
          // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1784
          //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1785
          //  lbNeedAddress is not deterministic
1786
          serviceConfigUpdated = true;
1✔
1787
        } catch (RuntimeException re) {
×
1788
          logger.log(
×
1789
              Level.WARNING,
1790
              "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1791
              re);
1792
        }
1✔
1793
      }
1794

1795
      Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1796
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1797
      if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1798
        Attributes.Builder attrBuilder =
1✔
1799
            effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1800
        Map<String, ?> healthCheckingConfig =
1✔
1801
            effectiveServiceConfig.getHealthCheckingConfig();
1✔
1802
        if (healthCheckingConfig != null) {
1✔
1803
          attrBuilder
1✔
1804
              .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1805
              .build();
1✔
1806
        }
1807
        Attributes attributes = attrBuilder.build();
1✔
1808

1809
        return helper.lb.tryAcceptResolvedAddresses(
1✔
1810
            ResolvedAddresses.newBuilder()
1✔
1811
                .setAddresses(servers)
1✔
1812
                .setAttributes(attributes)
1✔
1813
                .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1814
                .build());
1✔
1815
      }
1816
      return Status.OK;
×
1817
    }
1818

1819
    @Override
1820
    public void onError(final Status error) {
1821
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1822
      final class NameResolverErrorHandler implements Runnable {
1✔
1823
        @Override
1824
        public void run() {
1825
          handleErrorInSyncContext(error);
1✔
1826
        }
1✔
1827
      }
1828

1829
      syncContext.execute(new NameResolverErrorHandler());
1✔
1830
    }
1✔
1831

1832
    private void handleErrorInSyncContext(Status error) {
1833
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1834
          new Object[] {getLogId(), error});
1✔
1835
      realChannel.onConfigError();
1✔
1836
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1837
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1838
        lastResolutionState = ResolutionState.ERROR;
1✔
1839
      }
1840
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1841
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1842
        return;
1✔
1843
      }
1844

1845
      helper.lb.handleNameResolutionError(error);
1✔
1846
    }
1✔
1847
  }
1848

1849
  private final class SubchannelImpl extends AbstractSubchannel {
1850
    final CreateSubchannelArgs args;
1851
    final InternalLogId subchannelLogId;
1852
    final ChannelLoggerImpl subchannelLogger;
1853
    final ChannelTracer subchannelTracer;
1854
    List<EquivalentAddressGroup> addressGroups;
1855
    InternalSubchannel subchannel;
1856
    boolean started;
1857
    boolean shutdown;
1858
    ScheduledHandle delayedShutdownTask;
1859

1860
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1861
      checkNotNull(args, "args");
1✔
1862
      addressGroups = args.getAddresses();
1✔
1863
      if (authorityOverride != null) {
1✔
1864
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1865
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1866
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1867
      }
1868
      this.args = args;
1✔
1869
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1870
      subchannelTracer = new ChannelTracer(
1✔
1871
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1872
          "Subchannel for " + args.getAddresses());
1✔
1873
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1874
    }
1✔
1875

1876
    @Override
1877
    public void start(final SubchannelStateListener listener) {
1878
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1879
      checkState(!started, "already started");
1✔
1880
      checkState(!shutdown, "already shutdown");
1✔
1881
      checkState(!terminating, "Channel is being terminated");
1✔
1882
      started = true;
1✔
1883
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1884
        // All callbacks are run in syncContext
1885
        @Override
1886
        void onTerminated(InternalSubchannel is) {
1887
          subchannels.remove(is);
1✔
1888
          channelz.removeSubchannel(is);
1✔
1889
          maybeTerminateChannel();
1✔
1890
        }
1✔
1891

1892
        @Override
1893
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1894
          checkState(listener != null, "listener is null");
1✔
1895
          listener.onSubchannelState(newState);
1✔
1896
        }
1✔
1897

1898
        @Override
1899
        void onInUse(InternalSubchannel is) {
1900
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1901
        }
1✔
1902

1903
        @Override
1904
        void onNotInUse(InternalSubchannel is) {
1905
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1906
        }
1✔
1907
      }
1908

1909
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1910
          args.getAddresses(),
1✔
1911
          authority(),
1✔
1912
          userAgent,
1✔
1913
          backoffPolicyProvider,
1✔
1914
          transportFactory,
1✔
1915
          transportFactory.getScheduledExecutorService(),
1✔
1916
          stopwatchSupplier,
1✔
1917
          syncContext,
1918
          new ManagedInternalSubchannelCallback(),
1919
          channelz,
1✔
1920
          callTracerFactory.create(),
1✔
1921
          subchannelTracer,
1922
          subchannelLogId,
1923
          subchannelLogger,
1924
          transportFilters);
1✔
1925

1926
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1927
          .setDescription("Child Subchannel started")
1✔
1928
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1929
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1930
          .setSubchannelRef(internalSubchannel)
1✔
1931
          .build());
1✔
1932

1933
      this.subchannel = internalSubchannel;
1✔
1934
      channelz.addSubchannel(internalSubchannel);
1✔
1935
      subchannels.add(internalSubchannel);
1✔
1936
    }
1✔
1937

1938
    @Override
1939
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1940
      checkState(started, "not started");
1✔
1941
      return subchannel;
1✔
1942
    }
1943

1944
    @Override
1945
    public void shutdown() {
1946
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1947
      if (subchannel == null) {
1✔
1948
        // start() was not successful
1949
        shutdown = true;
×
1950
        return;
×
1951
      }
1952
      if (shutdown) {
1✔
1953
        if (terminating && delayedShutdownTask != null) {
1✔
1954
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1955
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1956
          delayedShutdownTask.cancel();
×
1957
          delayedShutdownTask = null;
×
1958
          // Will fall through to the subchannel.shutdown() at the end.
1959
        } else {
1960
          return;
1✔
1961
        }
1962
      } else {
1963
        shutdown = true;
1✔
1964
      }
1965
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1966
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1967
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1968
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1969
      // shutdown of Subchannel for a few seconds here.
1970
      //
1971
      // TODO(zhangkun83): consider a better approach
1972
      // (https://github.com/grpc/grpc-java/issues/2562).
1973
      if (!terminating) {
1✔
1974
        final class ShutdownSubchannel implements Runnable {
1✔
1975
          @Override
1976
          public void run() {
1977
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
1978
          }
1✔
1979
        }
1980

1981
        delayedShutdownTask = syncContext.schedule(
1✔
1982
            new LogExceptionRunnable(new ShutdownSubchannel()),
1983
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
1984
            transportFactory.getScheduledExecutorService());
1✔
1985
        return;
1✔
1986
      }
1987
      // When terminating == true, no more real streams will be created. It's safe and also
1988
      // desirable to shutdown timely.
1989
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
1990
    }
1✔
1991

1992
    @Override
1993
    public void requestConnection() {
1994
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1995
      checkState(started, "not started");
1✔
1996
      subchannel.obtainActiveTransport();
1✔
1997
    }
1✔
1998

1999
    @Override
2000
    public List<EquivalentAddressGroup> getAllAddresses() {
2001
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2002
      checkState(started, "not started");
1✔
2003
      return addressGroups;
1✔
2004
    }
2005

2006
    @Override
2007
    public Attributes getAttributes() {
2008
      return args.getAttributes();
1✔
2009
    }
2010

2011
    @Override
2012
    public String toString() {
2013
      return subchannelLogId.toString();
1✔
2014
    }
2015

2016
    @Override
2017
    public Channel asChannel() {
2018
      checkState(started, "not started");
1✔
2019
      return new SubchannelChannel(
1✔
2020
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2021
          transportFactory.getScheduledExecutorService(),
1✔
2022
          callTracerFactory.create(),
1✔
2023
          new AtomicReference<InternalConfigSelector>(null));
2024
    }
2025

2026
    @Override
2027
    public Object getInternalSubchannel() {
2028
      checkState(started, "Subchannel is not started");
1✔
2029
      return subchannel;
1✔
2030
    }
2031

2032
    @Override
2033
    public ChannelLogger getChannelLogger() {
2034
      return subchannelLogger;
1✔
2035
    }
2036

2037
    @Override
2038
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2039
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2040
      addressGroups = addrs;
1✔
2041
      if (authorityOverride != null) {
1✔
2042
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2043
      }
2044
      subchannel.updateAddresses(addrs);
1✔
2045
    }
1✔
2046

2047
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2048
        List<EquivalentAddressGroup> eags) {
2049
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2050
      for (EquivalentAddressGroup eag : eags) {
1✔
2051
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2052
            eag.getAddresses(),
1✔
2053
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2054
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2055
      }
1✔
2056
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2057
    }
2058
  }
2059

2060
  @Override
2061
  public String toString() {
2062
    return MoreObjects.toStringHelper(this)
1✔
2063
        .add("logId", logId.getId())
1✔
2064
        .add("target", target)
1✔
2065
        .toString();
1✔
2066
  }
2067

2068
  /**
2069
   * Called from syncContext.
2070
   */
2071
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2072
    @Override
2073
    public void transportShutdown(Status s) {
2074
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2075
    }
1✔
2076

2077
    @Override
2078
    public void transportReady() {
2079
      // Don't care
2080
    }
×
2081

2082
    @Override
2083
    public Attributes filterTransport(Attributes attributes) {
2084
      return attributes;
×
2085
    }
2086

2087
    @Override
2088
    public void transportInUse(final boolean inUse) {
2089
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2090
      if (inUse) {
1✔
2091
        // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
2092
        // subchannels is in use. But we should never be in idle mode when delayed transport is in
2093
        // use.
2094
        exitIdleMode();
1✔
2095
      }
2096
    }
1✔
2097

2098
    @Override
2099
    public void transportTerminated() {
2100
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2101
      terminating = true;
1✔
2102
      shutdownNameResolverAndLoadBalancer(false);
1✔
2103
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2104
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2105
      // here.
2106
      maybeShutdownNowSubchannels();
1✔
2107
      maybeTerminateChannel();
1✔
2108
    }
1✔
2109
  }
2110

2111
  /**
2112
   * Must be accessed from syncContext.
2113
   */
2114
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2115
    @Override
2116
    protected void handleInUse() {
2117
      exitIdleMode();
1✔
2118
    }
1✔
2119

2120
    @Override
2121
    protected void handleNotInUse() {
2122
      if (shutdown.get()) {
1✔
2123
        return;
1✔
2124
      }
2125
      rescheduleIdleTimer();
1✔
2126
    }
1✔
2127
  }
2128

2129
  /**
2130
   * Lazily request for Executor from an executor pool.
2131
   * Also act as an Executor directly to simply run a cmd
2132
   */
2133
  @VisibleForTesting
2134
  static final class ExecutorHolder implements Executor {
2135
    private final ObjectPool<? extends Executor> pool;
2136
    private Executor executor;
2137

2138
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2139
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2140
    }
1✔
2141

2142
    synchronized Executor getExecutor() {
2143
      if (executor == null) {
1✔
2144
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2145
      }
2146
      return executor;
1✔
2147
    }
2148

2149
    synchronized void release() {
2150
      if (executor != null) {
1✔
2151
        executor = pool.returnObject(executor);
1✔
2152
      }
2153
    }
1✔
2154

2155
    @Override
2156
    public void execute(Runnable command) {
2157
      getExecutor().execute(command);
1✔
2158
    }
1✔
2159
  }
2160

2161
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2162
    final ScheduledExecutorService delegate;
2163

2164
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2165
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2166
    }
1✔
2167

2168
    @Override
2169
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2170
      return delegate.schedule(callable, delay, unit);
×
2171
    }
2172

2173
    @Override
2174
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2175
      return delegate.schedule(cmd, delay, unit);
1✔
2176
    }
2177

2178
    @Override
2179
    public ScheduledFuture<?> scheduleAtFixedRate(
2180
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2181
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
1✔
2182
    }
2183

2184
    @Override
2185
    public ScheduledFuture<?> scheduleWithFixedDelay(
2186
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2187
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2188
    }
2189

2190
    @Override
2191
    public boolean awaitTermination(long timeout, TimeUnit unit)
2192
        throws InterruptedException {
2193
      return delegate.awaitTermination(timeout, unit);
×
2194
    }
2195

2196
    @Override
2197
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2198
        throws InterruptedException {
2199
      return delegate.invokeAll(tasks);
×
2200
    }
2201

2202
    @Override
2203
    public <T> List<Future<T>> invokeAll(
2204
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2205
        throws InterruptedException {
2206
      return delegate.invokeAll(tasks, timeout, unit);
×
2207
    }
2208

2209
    @Override
2210
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2211
        throws InterruptedException, ExecutionException {
2212
      return delegate.invokeAny(tasks);
×
2213
    }
2214

2215
    @Override
2216
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2217
        throws InterruptedException, ExecutionException, TimeoutException {
2218
      return delegate.invokeAny(tasks, timeout, unit);
×
2219
    }
2220

2221
    @Override
2222
    public boolean isShutdown() {
2223
      return delegate.isShutdown();
×
2224
    }
2225

2226
    @Override
2227
    public boolean isTerminated() {
2228
      return delegate.isTerminated();
×
2229
    }
2230

2231
    @Override
2232
    public void shutdown() {
2233
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2234
    }
2235

2236
    @Override
2237
    public List<Runnable> shutdownNow() {
2238
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2239
    }
2240

2241
    @Override
2242
    public <T> Future<T> submit(Callable<T> task) {
2243
      return delegate.submit(task);
×
2244
    }
2245

2246
    @Override
2247
    public Future<?> submit(Runnable task) {
2248
      return delegate.submit(task);
×
2249
    }
2250

2251
    @Override
2252
    public <T> Future<T> submit(Runnable task, T result) {
2253
      return delegate.submit(task, result);
×
2254
    }
2255

2256
    @Override
2257
    public void execute(Runnable command) {
2258
      delegate.execute(command);
×
2259
    }
×
2260
  }
2261

2262
  /**
2263
   * A ResolutionState indicates the status of last name resolution.
2264
   */
2265
  enum ResolutionState {
1✔
2266
    NO_RESOLUTION,
1✔
2267
    SUCCESS,
1✔
2268
    ERROR
1✔
2269
  }
2270
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc