• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19246

23 May 2024 09:45PM UTC coverage: 88.451%. Remained the same
#19246

push

github

web-flow
core: Exit idle mode when delayed transport is in use

8844cf7b8 triggered a regression where a new RPC wouldn't cause the
channel to exit idle mode, if an RPC was still progressing on an old
transport. This was already possible previously, but was racy.
8844cf7b8 made it less racy and more obvious.

The two added `exitIdleMode()` calls in this commit are companions to
those in `enterIdleMode()`, which detect whether the channel should
immediately exit idle mode.

Noticed in cl/635819804.

32045 of 36229 relevant lines covered (88.45%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.65
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import io.grpc.Attributes;
36
import io.grpc.CallCredentials;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ChannelCredentials;
40
import io.grpc.ChannelLogger;
41
import io.grpc.ChannelLogger.ChannelLogLevel;
42
import io.grpc.ClientCall;
43
import io.grpc.ClientInterceptor;
44
import io.grpc.ClientInterceptors;
45
import io.grpc.ClientStreamTracer;
46
import io.grpc.ClientTransportFilter;
47
import io.grpc.CompressorRegistry;
48
import io.grpc.ConnectivityState;
49
import io.grpc.ConnectivityStateInfo;
50
import io.grpc.Context;
51
import io.grpc.Deadline;
52
import io.grpc.DecompressorRegistry;
53
import io.grpc.EquivalentAddressGroup;
54
import io.grpc.ForwardingChannelBuilder2;
55
import io.grpc.ForwardingClientCall;
56
import io.grpc.Grpc;
57
import io.grpc.InternalChannelz;
58
import io.grpc.InternalChannelz.ChannelStats;
59
import io.grpc.InternalChannelz.ChannelTrace;
60
import io.grpc.InternalConfigSelector;
61
import io.grpc.InternalInstrumented;
62
import io.grpc.InternalLogId;
63
import io.grpc.InternalWithLogId;
64
import io.grpc.LoadBalancer;
65
import io.grpc.LoadBalancer.CreateSubchannelArgs;
66
import io.grpc.LoadBalancer.PickResult;
67
import io.grpc.LoadBalancer.PickSubchannelArgs;
68
import io.grpc.LoadBalancer.ResolvedAddresses;
69
import io.grpc.LoadBalancer.SubchannelPicker;
70
import io.grpc.LoadBalancer.SubchannelStateListener;
71
import io.grpc.ManagedChannel;
72
import io.grpc.ManagedChannelBuilder;
73
import io.grpc.Metadata;
74
import io.grpc.MethodDescriptor;
75
import io.grpc.MetricInstrumentRegistry;
76
import io.grpc.MetricRecorder;
77
import io.grpc.NameResolver;
78
import io.grpc.NameResolver.ConfigOrError;
79
import io.grpc.NameResolver.ResolutionResult;
80
import io.grpc.NameResolverProvider;
81
import io.grpc.NameResolverRegistry;
82
import io.grpc.ProxyDetector;
83
import io.grpc.Status;
84
import io.grpc.SynchronizationContext;
85
import io.grpc.SynchronizationContext.ScheduledHandle;
86
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
87
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
88
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
89
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
90
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
91
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
92
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
93
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
94
import io.grpc.internal.RetriableStream.Throttle;
95
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
96
import java.net.URI;
97
import java.util.ArrayList;
98
import java.util.Collection;
99
import java.util.Collections;
100
import java.util.HashSet;
101
import java.util.LinkedHashSet;
102
import java.util.List;
103
import java.util.Map;
104
import java.util.Set;
105
import java.util.concurrent.Callable;
106
import java.util.concurrent.CountDownLatch;
107
import java.util.concurrent.ExecutionException;
108
import java.util.concurrent.Executor;
109
import java.util.concurrent.Future;
110
import java.util.concurrent.ScheduledExecutorService;
111
import java.util.concurrent.ScheduledFuture;
112
import java.util.concurrent.TimeUnit;
113
import java.util.concurrent.TimeoutException;
114
import java.util.concurrent.atomic.AtomicBoolean;
115
import java.util.concurrent.atomic.AtomicReference;
116
import java.util.logging.Level;
117
import java.util.logging.Logger;
118
import javax.annotation.Nullable;
119
import javax.annotation.concurrent.GuardedBy;
120
import javax.annotation.concurrent.ThreadSafe;
121

122
/** A communication channel for making outgoing RPCs. */
123
@ThreadSafe
124
final class ManagedChannelImpl extends ManagedChannel implements
125
    InternalInstrumented<ChannelStats> {
126
  @VisibleForTesting
127
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
128

129
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
130

131
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
132

133
  @VisibleForTesting
134
  static final Status SHUTDOWN_NOW_STATUS =
1✔
135
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
136

137
  @VisibleForTesting
138
  static final Status SHUTDOWN_STATUS =
1✔
139
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
140

141
  @VisibleForTesting
142
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
143
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
144

145
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
146
      ManagedChannelServiceConfig.empty();
1✔
147
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
148
      new InternalConfigSelector() {
1✔
149
        @Override
150
        public Result selectConfig(PickSubchannelArgs args) {
151
          throw new IllegalStateException("Resolution is pending");
×
152
        }
153
      };
154
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
155
      new LoadBalancer.PickDetailsConsumer() {};
1✔
156

157
  private final InternalLogId logId;
158
  private final String target;
159
  @Nullable
160
  private final String authorityOverride;
161
  private final NameResolverRegistry nameResolverRegistry;
162
  private final URI targetUri;
163
  private final NameResolverProvider nameResolverProvider;
164
  private final NameResolver.Args nameResolverArgs;
165
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
166
  private final ClientTransportFactory originalTransportFactory;
167
  @Nullable
168
  private final ChannelCredentials originalChannelCreds;
169
  private final ClientTransportFactory transportFactory;
170
  private final ClientTransportFactory oobTransportFactory;
171
  private final RestrictedScheduledExecutor scheduledExecutor;
172
  private final Executor executor;
173
  private final ObjectPool<? extends Executor> executorPool;
174
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
175
  private final ExecutorHolder balancerRpcExecutorHolder;
176
  private final ExecutorHolder offloadExecutorHolder;
177
  private final TimeProvider timeProvider;
178
  private final int maxTraceEvents;
179

180
  @VisibleForTesting
1✔
181
  final SynchronizationContext syncContext = new SynchronizationContext(
182
      new Thread.UncaughtExceptionHandler() {
1✔
183
        @Override
184
        public void uncaughtException(Thread t, Throwable e) {
185
          logger.log(
1✔
186
              Level.SEVERE,
187
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
188
              e);
189
          panic(e);
1✔
190
        }
1✔
191
      });
192

193
  private boolean fullStreamDecompression;
194

195
  private final DecompressorRegistry decompressorRegistry;
196
  private final CompressorRegistry compressorRegistry;
197

198
  private final Supplier<Stopwatch> stopwatchSupplier;
199
  /** The timeout before entering idle mode. */
200
  private final long idleTimeoutMillis;
201

202
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
203
  private final BackoffPolicy.Provider backoffPolicyProvider;
204

205
  /**
206
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
207
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
208
   * {@link RealChannel}.
209
   */
210
  private final Channel interceptorChannel;
211

212
  private final List<ClientTransportFilter> transportFilters;
213
  @Nullable private final String userAgent;
214

215
  // Only null after channel is terminated. Must be assigned from the syncContext.
216
  private NameResolver nameResolver;
217

218
  // Must be accessed from the syncContext.
219
  private boolean nameResolverStarted;
220

221
  // null when channel is in idle mode.  Must be assigned from syncContext.
222
  @Nullable
223
  private LbHelperImpl lbHelper;
224

225
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
226
  // null if channel is in idle mode.
227
  @Nullable
228
  private volatile SubchannelPicker subchannelPicker;
229

230
  // Must be accessed from the syncContext
231
  private boolean panicMode;
232

233
  // Must be mutated from syncContext
234
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
235
  // switch to a ConcurrentHashMap.
236
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
237

238
  // Must be accessed from syncContext
239
  @Nullable
240
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
241
  private final Object pendingCallsInUseObject = new Object();
1✔
242

243
  // Must be mutated from syncContext
244
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
245

246
  // reprocess() must be run from syncContext
247
  private final DelayedClientTransport delayedTransport;
248
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
249
      = new UncommittedRetriableStreamsRegistry();
250

251
  // Shutdown states.
252
  //
253
  // Channel's shutdown process:
254
  // 1. shutdown(): stop accepting new calls from applications
255
  //   1a shutdown <- true
256
  //   1b subchannelPicker <- null
257
  //   1c delayedTransport.shutdown()
258
  // 2. delayedTransport terminated: stop stream-creation functionality
259
  //   2a terminating <- true
260
  //   2b loadBalancer.shutdown()
261
  //     * LoadBalancer will shutdown subchannels and OOB channels
262
  //   2c loadBalancer <- null
263
  //   2d nameResolver.shutdown()
264
  //   2e nameResolver <- null
265
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
266

267
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
268
  // Must only be mutated and read from syncContext
269
  private boolean shutdownNowed;
270
  // Must only be mutated from syncContext
271
  private boolean terminating;
272
  // Must be mutated from syncContext
273
  private volatile boolean terminated;
274
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
275

276
  private final CallTracer.Factory callTracerFactory;
277
  private final CallTracer channelCallTracer;
278
  private final ChannelTracer channelTracer;
279
  private final ChannelLogger channelLogger;
280
  private final InternalChannelz channelz;
281
  private final RealChannel realChannel;
282
  // Must be mutated and read from syncContext
283
  // a flag for doing channel tracing when flipped
284
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
285
  // Must be mutated and read from constructor or syncContext
286
  // used for channel tracing when value changed
287
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
288

289
  @Nullable
290
  private final ManagedChannelServiceConfig defaultServiceConfig;
291
  // Must be mutated and read from constructor or syncContext
292
  private boolean serviceConfigUpdated = false;
1✔
293
  private final boolean lookUpServiceConfig;
294

295
  // One instance per channel.
296
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
297

298
  private final long perRpcBufferLimit;
299
  private final long channelBufferLimit;
300

301
  // Temporary false flag that can skip the retry code path.
302
  private final boolean retryEnabled;
303

304
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
305

306
  // Called from syncContext
307
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
308
      new DelayedTransportListener();
309

310
  // Must be called from syncContext
311
  private void maybeShutdownNowSubchannels() {
312
    if (shutdownNowed) {
1✔
313
      for (InternalSubchannel subchannel : subchannels) {
1✔
314
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
315
      }
1✔
316
      for (OobChannel oobChannel : oobChannels) {
1✔
317
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
318
      }
1✔
319
    }
320
  }
1✔
321

322
  // Must be accessed from syncContext
323
  @VisibleForTesting
1✔
324
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
325

326
  @Override
327
  public ListenableFuture<ChannelStats> getStats() {
328
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
329
    final class StatsFetcher implements Runnable {
1✔
330
      @Override
331
      public void run() {
332
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
333
        channelCallTracer.updateBuilder(builder);
1✔
334
        channelTracer.updateBuilder(builder);
1✔
335
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
336
        List<InternalWithLogId> children = new ArrayList<>();
1✔
337
        children.addAll(subchannels);
1✔
338
        children.addAll(oobChannels);
1✔
339
        builder.setSubchannels(children);
1✔
340
        ret.set(builder.build());
1✔
341
      }
1✔
342
    }
343

344
    // subchannels and oobchannels can only be accessed from syncContext
345
    syncContext.execute(new StatsFetcher());
1✔
346
    return ret;
1✔
347
  }
348

349
  @Override
350
  public InternalLogId getLogId() {
351
    return logId;
1✔
352
  }
353

354
  // Run from syncContext
355
  private class IdleModeTimer implements Runnable {
1✔
356

357
    @Override
358
    public void run() {
359
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
360
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
361
      // subtle to change rapidly to resolve the channel panic. See #8714
362
      if (lbHelper == null) {
1✔
363
        return;
×
364
      }
365
      enterIdleMode();
1✔
366
    }
1✔
367
  }
368

369
  // Must be called from syncContext
370
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
371
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
372
    if (channelIsActive) {
1✔
373
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
374
      checkState(lbHelper != null, "lbHelper is null");
1✔
375
    }
376
    if (nameResolver != null) {
1✔
377
      nameResolver.shutdown();
1✔
378
      nameResolverStarted = false;
1✔
379
      if (channelIsActive) {
1✔
380
        nameResolver = getNameResolver(
1✔
381
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
382
      } else {
383
        nameResolver = null;
1✔
384
      }
385
    }
386
    if (lbHelper != null) {
1✔
387
      lbHelper.lb.shutdown();
1✔
388
      lbHelper = null;
1✔
389
    }
390
    subchannelPicker = null;
1✔
391
  }
1✔
392

393
  /**
394
   * Make the channel exit idle mode, if it's in it.
395
   *
396
   * <p>Must be called from syncContext
397
   */
398
  @VisibleForTesting
399
  void exitIdleMode() {
400
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
401
    if (shutdown.get() || panicMode) {
1✔
402
      return;
1✔
403
    }
404
    if (inUseStateAggregator.isInUse()) {
1✔
405
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
406
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
407
      cancelIdleTimer(false);
1✔
408
    } else {
409
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
410
      // isInUse() == false, in which case we still need to schedule the timer.
411
      rescheduleIdleTimer();
1✔
412
    }
413
    if (lbHelper != null) {
1✔
414
      return;
1✔
415
    }
416
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
417
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
418
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
419
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
420
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
421
    this.lbHelper = lbHelper;
1✔
422

423
    channelStateManager.gotoState(CONNECTING);
1✔
424
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
425
    nameResolver.start(listener);
1✔
426
    nameResolverStarted = true;
1✔
427
  }
1✔
428

429
  // Must be run from syncContext
430
  private void enterIdleMode() {
431
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
432
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
433
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
434
    // which are bugs.
435
    shutdownNameResolverAndLoadBalancer(true);
1✔
436
    delayedTransport.reprocess(null);
1✔
437
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
438
    channelStateManager.gotoState(IDLE);
1✔
439
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
440
    // transport to be holding some we need to exit idle mode to give these calls a chance to
441
    // be processed.
442
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
443
      exitIdleMode();
1✔
444
    }
445
  }
1✔
446

447
  // Must be run from syncContext
448
  private void cancelIdleTimer(boolean permanent) {
449
    idleTimer.cancel(permanent);
1✔
450
  }
1✔
451

452
  // Always run from syncContext
453
  private void rescheduleIdleTimer() {
454
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
455
      return;
1✔
456
    }
457
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
458
  }
1✔
459

460
  /**
461
   * Force name resolution refresh to happen immediately. Must be run
462
   * from syncContext.
463
   */
464
  private void refreshNameResolution() {
465
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
466
    if (nameResolverStarted) {
1✔
467
      nameResolver.refresh();
1✔
468
    }
469
  }
1✔
470

471
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
472
    volatile Throttle throttle;
473

474
    @Override
475
    public ClientStream newStream(
476
        final MethodDescriptor<?, ?> method,
477
        final CallOptions callOptions,
478
        final Metadata headers,
479
        final Context context) {
480
      // There is no need to reschedule the idle timer here. If the channel isn't shut down, either
481
      // the delayed transport or a real transport will go in-use and cancel the idle timer.
482
      if (!retryEnabled) {
1✔
483
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
484
            callOptions, headers, 0, /* isTransparentRetry= */ false);
485
        Context origContext = context.attach();
1✔
486
        try {
487
          return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
488
        } finally {
489
          context.detach(origContext);
1✔
490
        }
491
      } else {
492
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
493
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
494
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
495
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
496
          @SuppressWarnings("unchecked")
497
          RetryStream() {
1✔
498
            super(
1✔
499
                (MethodDescriptor<ReqT, ?>) method,
500
                headers,
501
                channelBufferUsed,
1✔
502
                perRpcBufferLimit,
1✔
503
                channelBufferLimit,
1✔
504
                getCallExecutor(callOptions),
1✔
505
                transportFactory.getScheduledExecutorService(),
1✔
506
                retryPolicy,
507
                hedgingPolicy,
508
                throttle);
509
          }
1✔
510

511
          @Override
512
          Status prestart() {
513
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
514
          }
515

516
          @Override
517
          void postCommit() {
518
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
519
          }
1✔
520

521
          @Override
522
          ClientStream newSubstream(
523
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
524
              boolean isTransparentRetry) {
525
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
526
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
527
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
528
            Context origContext = context.attach();
1✔
529
            try {
530
              return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
1✔
531
            } finally {
532
              context.detach(origContext);
1✔
533
            }
534
          }
535
        }
536

537
        return new RetryStream<>();
1✔
538
      }
539
    }
540
  }
541

542
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
543

544
  private final Rescheduler idleTimer;
545
  private final MetricRecorder metricRecorder;
546

547
  ManagedChannelImpl(
548
      ManagedChannelImplBuilder builder,
549
      ClientTransportFactory clientTransportFactory,
550
      URI targetUri,
551
      NameResolverProvider nameResolverProvider,
552
      BackoffPolicy.Provider backoffPolicyProvider,
553
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
554
      Supplier<Stopwatch> stopwatchSupplier,
555
      List<ClientInterceptor> interceptors,
556
      final TimeProvider timeProvider) {
1✔
557
    this.target = checkNotNull(builder.target, "target");
1✔
558
    this.logId = InternalLogId.allocate("Channel", target);
1✔
559
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
560
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
561
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
562
    this.originalChannelCreds = builder.channelCredentials;
1✔
563
    this.originalTransportFactory = clientTransportFactory;
1✔
564
    this.offloadExecutorHolder =
1✔
565
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
566
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
567
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
568
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
569
        clientTransportFactory, null, this.offloadExecutorHolder);
570
    this.scheduledExecutor =
1✔
571
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
572
    maxTraceEvents = builder.maxTraceEvents;
1✔
573
    channelTracer = new ChannelTracer(
1✔
574
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
575
        "Channel for '" + target + "'");
576
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
577
    ProxyDetector proxyDetector =
578
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
579
    this.retryEnabled = builder.retryEnabled;
1✔
580
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
581
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
582
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
583
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
584
    ScParser serviceConfigParser =
1✔
585
        new ScParser(
586
            retryEnabled,
587
            builder.maxRetryAttempts,
588
            builder.maxHedgedAttempts,
589
            loadBalancerFactory);
590
    this.authorityOverride = builder.authorityOverride;
1✔
591
    this.nameResolverArgs =
1✔
592
        NameResolver.Args.newBuilder()
1✔
593
            .setDefaultPort(builder.getDefaultPort())
1✔
594
            .setProxyDetector(proxyDetector)
1✔
595
            .setSynchronizationContext(syncContext)
1✔
596
            .setScheduledExecutorService(scheduledExecutor)
1✔
597
            .setServiceConfigParser(serviceConfigParser)
1✔
598
            .setChannelLogger(channelLogger)
1✔
599
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
600
            .setOverrideAuthority(this.authorityOverride)
1✔
601
            .build();
1✔
602
    this.nameResolver = getNameResolver(
1✔
603
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
604
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
605
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
606
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
607
    this.delayedTransport.start(delayedTransportListener);
1✔
608
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
609

610
    if (builder.defaultServiceConfig != null) {
1✔
611
      ConfigOrError parsedDefaultServiceConfig =
1✔
612
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
613
      checkState(
1✔
614
          parsedDefaultServiceConfig.getError() == null,
1✔
615
          "Default config is invalid: %s",
616
          parsedDefaultServiceConfig.getError());
1✔
617
      this.defaultServiceConfig =
1✔
618
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
619
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
620
    } else {
1✔
621
      this.defaultServiceConfig = null;
1✔
622
    }
623
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
624
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
625
    Channel channel = realChannel;
1✔
626
    if (builder.binlog != null) {
1✔
627
      channel = builder.binlog.wrapChannel(channel);
1✔
628
    }
629
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
630
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
631
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
632
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
633
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
634
    } else {
635
      checkArgument(
1✔
636
          builder.idleTimeoutMillis
637
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
638
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
639
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
640
    }
641

642
    idleTimer = new Rescheduler(
1✔
643
        new IdleModeTimer(),
644
        syncContext,
645
        transportFactory.getScheduledExecutorService(),
1✔
646
        stopwatchSupplier.get());
1✔
647
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
648
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
649
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
650
    this.userAgent = builder.userAgent;
1✔
651

652
    this.channelBufferLimit = builder.retryBufferSize;
1✔
653
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
654
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
655
      @Override
656
      public CallTracer create() {
657
        return new CallTracer(timeProvider);
1✔
658
      }
659
    }
660

661
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
662
    channelCallTracer = callTracerFactory.create();
1✔
663
    this.channelz = checkNotNull(builder.channelz);
1✔
664
    channelz.addRootChannel(this);
1✔
665

666
    if (!lookUpServiceConfig) {
1✔
667
      if (defaultServiceConfig != null) {
1✔
668
        channelLogger.log(
1✔
669
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
670
      }
671
      serviceConfigUpdated = true;
1✔
672
    }
673
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
674
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
675
  }
1✔
676

677
  @VisibleForTesting
678
  static NameResolver getNameResolver(
679
      URI targetUri, @Nullable final String overrideAuthority,
680
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
681
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
682
    if (resolver == null) {
1✔
683
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
684
    }
685

686
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
687
    // TODO: After a transition period, all NameResolver implementations that need retry should use
688
    //       RetryingNameResolver directly and this step can be removed.
689
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
690
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
691
              nameResolverArgs.getScheduledExecutorService(),
1✔
692
              nameResolverArgs.getSynchronizationContext()),
1✔
693
          nameResolverArgs.getSynchronizationContext());
1✔
694

695
    if (overrideAuthority == null) {
1✔
696
      return usedNameResolver;
1✔
697
    }
698

699
    return new ForwardingNameResolver(usedNameResolver) {
1✔
700
      @Override
701
      public String getServiceAuthority() {
702
        return overrideAuthority;
1✔
703
      }
704
    };
705
  }
706

707
  @VisibleForTesting
708
  InternalConfigSelector getConfigSelector() {
709
    return realChannel.configSelector.get();
1✔
710
  }
711

712
  /**
713
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
714
   * cancelled.
715
   */
716
  @Override
717
  public ManagedChannelImpl shutdown() {
718
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
719
    if (!shutdown.compareAndSet(false, true)) {
1✔
720
      return this;
1✔
721
    }
722
    final class Shutdown implements Runnable {
1✔
723
      @Override
724
      public void run() {
725
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
726
        channelStateManager.gotoState(SHUTDOWN);
1✔
727
      }
1✔
728
    }
729

730
    syncContext.execute(new Shutdown());
1✔
731
    realChannel.shutdown();
1✔
732
    final class CancelIdleTimer implements Runnable {
1✔
733
      @Override
734
      public void run() {
735
        cancelIdleTimer(/* permanent= */ true);
1✔
736
      }
1✔
737
    }
738

739
    syncContext.execute(new CancelIdleTimer());
1✔
740
    return this;
1✔
741
  }
742

743
  /**
744
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
745
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
746
   * return {@code false} immediately after this method returns.
747
   */
748
  @Override
749
  public ManagedChannelImpl shutdownNow() {
750
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
751
    shutdown();
1✔
752
    realChannel.shutdownNow();
1✔
753
    final class ShutdownNow implements Runnable {
1✔
754
      @Override
755
      public void run() {
756
        if (shutdownNowed) {
1✔
757
          return;
1✔
758
        }
759
        shutdownNowed = true;
1✔
760
        maybeShutdownNowSubchannels();
1✔
761
      }
1✔
762
    }
763

764
    syncContext.execute(new ShutdownNow());
1✔
765
    return this;
1✔
766
  }
767

768
  // Called from syncContext
769
  @VisibleForTesting
770
  void panic(final Throwable t) {
771
    if (panicMode) {
1✔
772
      // Preserve the first panic information
773
      return;
×
774
    }
775
    panicMode = true;
1✔
776
    cancelIdleTimer(/* permanent= */ true);
1✔
777
    shutdownNameResolverAndLoadBalancer(false);
1✔
778
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
779
      private final PickResult panicPickResult =
1✔
780
          PickResult.withDrop(
1✔
781
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
782

783
      @Override
784
      public PickResult pickSubchannel(PickSubchannelArgs args) {
785
        return panicPickResult;
1✔
786
      }
787

788
      @Override
789
      public String toString() {
790
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
791
            .add("panicPickResult", panicPickResult)
×
792
            .toString();
×
793
      }
794
    }
795

796
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
797
    realChannel.updateConfigSelector(null);
1✔
798
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
799
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
800
  }
1✔
801

802
  @VisibleForTesting
803
  boolean isInPanicMode() {
804
    return panicMode;
1✔
805
  }
806

807
  // Called from syncContext
808
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
809
    subchannelPicker = newPicker;
1✔
810
    delayedTransport.reprocess(newPicker);
1✔
811
  }
1✔
812

813
  @Override
814
  public boolean isShutdown() {
815
    return shutdown.get();
1✔
816
  }
817

818
  @Override
819
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
820
    return terminatedLatch.await(timeout, unit);
1✔
821
  }
822

823
  @Override
824
  public boolean isTerminated() {
825
    return terminated;
1✔
826
  }
827

828
  /*
829
   * Creates a new outgoing call on the channel.
830
   */
831
  @Override
832
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
833
      CallOptions callOptions) {
834
    return interceptorChannel.newCall(method, callOptions);
1✔
835
  }
836

837
  @Override
838
  public String authority() {
839
    return interceptorChannel.authority();
1✔
840
  }
841

842
  private Executor getCallExecutor(CallOptions callOptions) {
843
    Executor executor = callOptions.getExecutor();
1✔
844
    if (executor == null) {
1✔
845
      executor = this.executor;
1✔
846
    }
847
    return executor;
1✔
848
  }
849

850
  private class RealChannel extends Channel {
851
    // Reference to null if no config selector is available from resolution result
852
    // Reference must be set() from syncContext
853
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
854
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
855
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
856
    // same target, the new instance must have the same value.
857
    private final String authority;
858

859
    private final Channel clientCallImplChannel = new Channel() {
1✔
860
      @Override
861
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
862
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
863
        return new ClientCallImpl<>(
1✔
864
            method,
865
            getCallExecutor(callOptions),
1✔
866
            callOptions,
867
            transportProvider,
1✔
868
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
869
            channelCallTracer,
1✔
870
            null)
871
            .setFullStreamDecompression(fullStreamDecompression)
1✔
872
            .setDecompressorRegistry(decompressorRegistry)
1✔
873
            .setCompressorRegistry(compressorRegistry);
1✔
874
      }
875

876
      @Override
877
      public String authority() {
878
        return authority;
×
879
      }
880
    };
881

882
    private RealChannel(String authority) {
1✔
883
      this.authority =  checkNotNull(authority, "authority");
1✔
884
    }
1✔
885

886
    @Override
887
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
888
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
889
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
890
        return newClientCall(method, callOptions);
1✔
891
      }
892
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
893
        // This is an optimization for the case (typically with InProcessTransport) when name
894
        // resolution result is immediately available at this point. Otherwise, some users'
895
        // tests might observe slight behavior difference from earlier grpc versions.
896
        return newClientCall(method, callOptions);
×
897
      }
898
      if (shutdown.get()) {
1✔
899
        // Return a failing ClientCall.
900
        return new ClientCall<ReqT, RespT>() {
×
901
          @Override
902
          public void start(Listener<RespT> responseListener, Metadata headers) {
903
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
904
          }
×
905

906
          @Override public void request(int numMessages) {}
×
907

908
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
909

910
          @Override public void halfClose() {}
×
911

912
          @Override public void sendMessage(ReqT message) {}
×
913
        };
914
      }
915
      Context context = Context.current();
1✔
916
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
917
      syncContext.execute(new Runnable() {
1✔
918
        @Override
919
        public void run() {
920
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
921
            if (pendingCalls == null) {
1✔
922
              pendingCalls = new LinkedHashSet<>();
1✔
923
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
924
              // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of
925
              // the subchannels is in use. But we should never be in idle mode when pendingCalls is
926
              // in use.
927
              exitIdleMode();
1✔
928
            }
929
            pendingCalls.add(pendingCall);
1✔
930
          } else {
931
            pendingCall.reprocess();
1✔
932
          }
933
        }
1✔
934
      });
935
      return pendingCall;
1✔
936
    }
937

938
    // Must run in SynchronizationContext.
939
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
940
      InternalConfigSelector prevConfig = configSelector.get();
1✔
941
      configSelector.set(config);
1✔
942
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
943
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
944
          pendingCall.reprocess();
1✔
945
        }
1✔
946
      }
947
    }
1✔
948

949
    // Must run in SynchronizationContext.
950
    void onConfigError() {
951
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
952
        updateConfigSelector(null);
1✔
953
      }
954
    }
1✔
955

956
    void shutdown() {
957
      final class RealChannelShutdown implements Runnable {
1✔
958
        @Override
959
        public void run() {
960
          if (pendingCalls == null) {
1✔
961
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
962
              configSelector.set(null);
1✔
963
            }
964
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
965
          }
966
        }
1✔
967
      }
968

969
      syncContext.execute(new RealChannelShutdown());
1✔
970
    }
1✔
971

972
    void shutdownNow() {
973
      final class RealChannelShutdownNow implements Runnable {
1✔
974
        @Override
975
        public void run() {
976
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
977
            configSelector.set(null);
1✔
978
          }
979
          if (pendingCalls != null) {
1✔
980
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
981
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
982
            }
1✔
983
          }
984
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
985
        }
1✔
986
      }
987

988
      syncContext.execute(new RealChannelShutdownNow());
1✔
989
    }
1✔
990

991
    @Override
992
    public String authority() {
993
      return authority;
1✔
994
    }
995

996
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
997
      final Context context;
998
      final MethodDescriptor<ReqT, RespT> method;
999
      final CallOptions callOptions;
1000
      private final long callCreationTime;
1001

1002
      PendingCall(
1003
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1004
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1005
        this.context = context;
1✔
1006
        this.method = method;
1✔
1007
        this.callOptions = callOptions;
1✔
1008
        this.callCreationTime = ticker.nanoTime();
1✔
1009
      }
1✔
1010

1011
      /** Called when it's ready to create a real call and reprocess the pending call. */
1012
      void reprocess() {
1013
        ClientCall<ReqT, RespT> realCall;
1014
        Context previous = context.attach();
1✔
1015
        try {
1016
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1017
              ticker.nanoTime() - callCreationTime);
1✔
1018
          realCall = newClientCall(method, delayResolutionOption);
1✔
1019
        } finally {
1020
          context.detach(previous);
1✔
1021
        }
1022
        Runnable toRun = setCall(realCall);
1✔
1023
        if (toRun == null) {
1✔
1024
          syncContext.execute(new PendingCallRemoval());
1✔
1025
        } else {
1026
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1027
            @Override
1028
            public void run() {
1029
              toRun.run();
1✔
1030
              syncContext.execute(new PendingCallRemoval());
1✔
1031
            }
1✔
1032
          });
1033
        }
1034
      }
1✔
1035

1036
      @Override
1037
      protected void callCancelled() {
1038
        super.callCancelled();
1✔
1039
        syncContext.execute(new PendingCallRemoval());
1✔
1040
      }
1✔
1041

1042
      final class PendingCallRemoval implements Runnable {
1✔
1043
        @Override
1044
        public void run() {
1045
          if (pendingCalls != null) {
1✔
1046
            pendingCalls.remove(PendingCall.this);
1✔
1047
            if (pendingCalls.isEmpty()) {
1✔
1048
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1049
              pendingCalls = null;
1✔
1050
              if (shutdown.get()) {
1✔
1051
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1052
              }
1053
            }
1054
          }
1055
        }
1✔
1056
      }
1057
    }
1058

1059
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1060
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1061
      InternalConfigSelector selector = configSelector.get();
1✔
1062
      if (selector == null) {
1✔
1063
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1064
      }
1065
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1066
        MethodInfo methodInfo =
1✔
1067
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1068
        if (methodInfo != null) {
1✔
1069
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1070
        }
1071
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1072
      }
1073
      return new ConfigSelectingClientCall<>(
1✔
1074
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1075
    }
1076
  }
1077

1078
  /**
1079
   * A client call for a given channel that applies a given config selector when it starts.
1080
   */
1081
  static final class ConfigSelectingClientCall<ReqT, RespT>
1082
      extends ForwardingClientCall<ReqT, RespT> {
1083

1084
    private final InternalConfigSelector configSelector;
1085
    private final Channel channel;
1086
    private final Executor callExecutor;
1087
    private final MethodDescriptor<ReqT, RespT> method;
1088
    private final Context context;
1089
    private CallOptions callOptions;
1090

1091
    private ClientCall<ReqT, RespT> delegate;
1092

1093
    ConfigSelectingClientCall(
1094
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1095
        MethodDescriptor<ReqT, RespT> method,
1096
        CallOptions callOptions) {
1✔
1097
      this.configSelector = configSelector;
1✔
1098
      this.channel = channel;
1✔
1099
      this.method = method;
1✔
1100
      this.callExecutor =
1✔
1101
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1102
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1103
      this.context = Context.current();
1✔
1104
    }
1✔
1105

1106
    @Override
1107
    protected ClientCall<ReqT, RespT> delegate() {
1108
      return delegate;
1✔
1109
    }
1110

1111
    @SuppressWarnings("unchecked")
1112
    @Override
1113
    public void start(Listener<RespT> observer, Metadata headers) {
1114
      PickSubchannelArgs args =
1✔
1115
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1116
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1117
      Status status = result.getStatus();
1✔
1118
      if (!status.isOk()) {
1✔
1119
        executeCloseObserverInContext(observer,
1✔
1120
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1121
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1122
        return;
1✔
1123
      }
1124
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1125
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1126
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1127
      if (methodInfo != null) {
1✔
1128
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1129
      }
1130
      if (interceptor != null) {
1✔
1131
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1132
      } else {
1133
        delegate = channel.newCall(method, callOptions);
×
1134
      }
1135
      delegate.start(observer, headers);
1✔
1136
    }
1✔
1137

1138
    private void executeCloseObserverInContext(
1139
        final Listener<RespT> observer, final Status status) {
1140
      class CloseInContext extends ContextRunnable {
1141
        CloseInContext() {
1✔
1142
          super(context);
1✔
1143
        }
1✔
1144

1145
        @Override
1146
        public void runInContext() {
1147
          observer.onClose(status, new Metadata());
1✔
1148
        }
1✔
1149
      }
1150

1151
      callExecutor.execute(new CloseInContext());
1✔
1152
    }
1✔
1153

1154
    @Override
1155
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1156
      if (delegate != null) {
×
1157
        delegate.cancel(message, cause);
×
1158
      }
1159
    }
×
1160
  }
1161

1162
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1163
    @Override
1164
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1165

1166
    @Override
1167
    public void request(int numMessages) {}
1✔
1168

1169
    @Override
1170
    public void cancel(String message, Throwable cause) {}
×
1171

1172
    @Override
1173
    public void halfClose() {}
×
1174

1175
    @Override
1176
    public void sendMessage(Object message) {}
×
1177

1178
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1179
    @Override
1180
    public boolean isReady() {
1181
      return false;
×
1182
    }
1183
  };
1184

1185
  /**
1186
   * Terminate the channel if termination conditions are met.
1187
   */
1188
  // Must be run from syncContext
1189
  private void maybeTerminateChannel() {
1190
    if (terminated) {
1✔
1191
      return;
×
1192
    }
1193
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1194
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1195
      channelz.removeRootChannel(this);
1✔
1196
      executorPool.returnObject(executor);
1✔
1197
      balancerRpcExecutorHolder.release();
1✔
1198
      offloadExecutorHolder.release();
1✔
1199
      // Release the transport factory so that it can deallocate any resources.
1200
      transportFactory.close();
1✔
1201

1202
      terminated = true;
1✔
1203
      terminatedLatch.countDown();
1✔
1204
    }
1205
  }
1✔
1206

1207
  // Must be called from syncContext
1208
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1209
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1210
      refreshNameResolution();
1✔
1211
    }
1212
  }
1✔
1213

1214
  @Override
1215
  @SuppressWarnings("deprecation")
1216
  public ConnectivityState getState(boolean requestConnection) {
1217
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1218
    if (requestConnection && savedChannelState == IDLE) {
1✔
1219
      final class RequestConnection implements Runnable {
1✔
1220
        @Override
1221
        public void run() {
1222
          exitIdleMode();
1✔
1223
          if (subchannelPicker != null) {
1✔
1224
            subchannelPicker.requestConnection();
1✔
1225
          }
1226
          if (lbHelper != null) {
1✔
1227
            lbHelper.lb.requestConnection();
1✔
1228
          }
1229
        }
1✔
1230
      }
1231

1232
      syncContext.execute(new RequestConnection());
1✔
1233
    }
1234
    return savedChannelState;
1✔
1235
  }
1236

1237
  @Override
1238
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1239
    final class NotifyStateChanged implements Runnable {
1✔
1240
      @Override
1241
      public void run() {
1242
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1243
      }
1✔
1244
    }
1245

1246
    syncContext.execute(new NotifyStateChanged());
1✔
1247
  }
1✔
1248

1249
  @Override
1250
  public void resetConnectBackoff() {
1251
    final class ResetConnectBackoff implements Runnable {
1✔
1252
      @Override
1253
      public void run() {
1254
        if (shutdown.get()) {
1✔
1255
          return;
1✔
1256
        }
1257
        if (nameResolverStarted) {
1✔
1258
          refreshNameResolution();
1✔
1259
        }
1260
        for (InternalSubchannel subchannel : subchannels) {
1✔
1261
          subchannel.resetConnectBackoff();
1✔
1262
        }
1✔
1263
        for (OobChannel oobChannel : oobChannels) {
1✔
1264
          oobChannel.resetConnectBackoff();
×
1265
        }
×
1266
      }
1✔
1267
    }
1268

1269
    syncContext.execute(new ResetConnectBackoff());
1✔
1270
  }
1✔
1271

1272
  @Override
1273
  public void enterIdle() {
1274
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1275
      @Override
1276
      public void run() {
1277
        if (shutdown.get() || lbHelper == null) {
1✔
1278
          return;
1✔
1279
        }
1280
        cancelIdleTimer(/* permanent= */ false);
1✔
1281
        enterIdleMode();
1✔
1282
      }
1✔
1283
    }
1284

1285
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1286
  }
1✔
1287

1288
  /**
1289
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1290
   * backoff.
1291
   */
1292
  private final class UncommittedRetriableStreamsRegistry {
1✔
1293
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1294
    // it's worthwhile to look for a lock-free approach.
1295
    final Object lock = new Object();
1✔
1296

1297
    @GuardedBy("lock")
1✔
1298
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1299

1300
    @GuardedBy("lock")
1301
    Status shutdownStatus;
1302

1303
    void onShutdown(Status reason) {
1304
      boolean shouldShutdownDelayedTransport = false;
1✔
1305
      synchronized (lock) {
1✔
1306
        if (shutdownStatus != null) {
1✔
1307
          return;
1✔
1308
        }
1309
        shutdownStatus = reason;
1✔
1310
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1311
        // retriable streams, which may be in backoff and not using any transport, are already
1312
        // started RPCs.
1313
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1314
          shouldShutdownDelayedTransport = true;
1✔
1315
        }
1316
      }
1✔
1317

1318
      if (shouldShutdownDelayedTransport) {
1✔
1319
        delayedTransport.shutdown(reason);
1✔
1320
      }
1321
    }
1✔
1322

1323
    void onShutdownNow(Status reason) {
1324
      onShutdown(reason);
1✔
1325
      Collection<ClientStream> streams;
1326

1327
      synchronized (lock) {
1✔
1328
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1329
      }
1✔
1330

1331
      for (ClientStream stream : streams) {
1✔
1332
        stream.cancel(reason);
1✔
1333
      }
1✔
1334
      delayedTransport.shutdownNow(reason);
1✔
1335
    }
1✔
1336

1337
    /**
1338
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1339
     * shutdown Status.
1340
     */
1341
    @Nullable
1342
    Status add(RetriableStream<?> retriableStream) {
1343
      synchronized (lock) {
1✔
1344
        if (shutdownStatus != null) {
1✔
1345
          return shutdownStatus;
1✔
1346
        }
1347
        uncommittedRetriableStreams.add(retriableStream);
1✔
1348
        return null;
1✔
1349
      }
1350
    }
1351

1352
    void remove(RetriableStream<?> retriableStream) {
1353
      Status shutdownStatusCopy = null;
1✔
1354

1355
      synchronized (lock) {
1✔
1356
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1357
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1358
          shutdownStatusCopy = shutdownStatus;
1✔
1359
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1360
          // hashmap.
1361
          uncommittedRetriableStreams = new HashSet<>();
1✔
1362
        }
1363
      }
1✔
1364

1365
      if (shutdownStatusCopy != null) {
1✔
1366
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1367
      }
1368
    }
1✔
1369
  }
1370

1371
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1372
    AutoConfiguredLoadBalancer lb;
1373

1374
    @Override
1375
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1376
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1377
      // No new subchannel should be created after load balancer has been shutdown.
1378
      checkState(!terminating, "Channel is being terminated");
1✔
1379
      return new SubchannelImpl(args);
1✔
1380
    }
1381

1382
    @Override
1383
    public void updateBalancingState(
1384
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1385
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1386
      checkNotNull(newState, "newState");
1✔
1387
      checkNotNull(newPicker, "newPicker");
1✔
1388
      final class UpdateBalancingState implements Runnable {
1✔
1389
        @Override
1390
        public void run() {
1391
          if (LbHelperImpl.this != lbHelper) {
1✔
1392
            return;
1✔
1393
          }
1394
          updateSubchannelPicker(newPicker);
1✔
1395
          // It's not appropriate to report SHUTDOWN state from lb.
1396
          // Ignore the case of newState == SHUTDOWN for now.
1397
          if (newState != SHUTDOWN) {
1✔
1398
            channelLogger.log(
1✔
1399
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1400
            channelStateManager.gotoState(newState);
1✔
1401
          }
1402
        }
1✔
1403
      }
1404

1405
      syncContext.execute(new UpdateBalancingState());
1✔
1406
    }
1✔
1407

1408
    @Override
1409
    public void refreshNameResolution() {
1410
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1411
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1412
        @Override
1413
        public void run() {
1414
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1415
        }
1✔
1416
      }
1417

1418
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1419
    }
1✔
1420

1421
    @Override
1422
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1423
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1424
    }
1425

1426
    @Override
1427
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1428
        String authority) {
1429
      // TODO(ejona): can we be even stricter? Like terminating?
1430
      checkState(!terminated, "Channel is terminated");
1✔
1431
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1432
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1433
      InternalLogId subchannelLogId =
1✔
1434
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1435
      ChannelTracer oobChannelTracer =
1✔
1436
          new ChannelTracer(
1437
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1438
              "OobChannel for " + addressGroup);
1439
      final OobChannel oobChannel = new OobChannel(
1✔
1440
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1441
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1442
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1443
          .setDescription("Child OobChannel created")
1✔
1444
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1445
          .setTimestampNanos(oobChannelCreationTime)
1✔
1446
          .setChannelRef(oobChannel)
1✔
1447
          .build());
1✔
1448
      ChannelTracer subchannelTracer =
1✔
1449
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1450
              "Subchannel for " + addressGroup);
1451
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1452
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1453
        @Override
1454
        void onTerminated(InternalSubchannel is) {
1455
          oobChannels.remove(oobChannel);
1✔
1456
          channelz.removeSubchannel(is);
1✔
1457
          oobChannel.handleSubchannelTerminated();
1✔
1458
          maybeTerminateChannel();
1✔
1459
        }
1✔
1460

1461
        @Override
1462
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1463
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1464
          //  state and refresh name resolution if necessary.
1465
          handleInternalSubchannelState(newState);
1✔
1466
          oobChannel.handleSubchannelStateChange(newState);
1✔
1467
        }
1✔
1468
      }
1469

1470
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1471
          addressGroup,
1472
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1473
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1474
          // All callback methods are run from syncContext
1475
          new ManagedOobChannelCallback(),
1476
          channelz,
1✔
1477
          callTracerFactory.create(),
1✔
1478
          subchannelTracer,
1479
          subchannelLogId,
1480
          subchannelLogger,
1481
          transportFilters);
1✔
1482
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1483
          .setDescription("Child Subchannel created")
1✔
1484
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1485
          .setTimestampNanos(oobChannelCreationTime)
1✔
1486
          .setSubchannelRef(internalSubchannel)
1✔
1487
          .build());
1✔
1488
      channelz.addSubchannel(oobChannel);
1✔
1489
      channelz.addSubchannel(internalSubchannel);
1✔
1490
      oobChannel.setSubchannel(internalSubchannel);
1✔
1491
      final class AddOobChannel implements Runnable {
1✔
1492
        @Override
1493
        public void run() {
1494
          if (terminating) {
1✔
1495
            oobChannel.shutdown();
×
1496
          }
1497
          if (!terminated) {
1✔
1498
            // If channel has not terminated, it will track the subchannel and block termination
1499
            // for it.
1500
            oobChannels.add(oobChannel);
1✔
1501
          }
1502
        }
1✔
1503
      }
1504

1505
      syncContext.execute(new AddOobChannel());
1✔
1506
      return oobChannel;
1✔
1507
    }
1508

1509
    @Deprecated
1510
    @Override
1511
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1512
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1513
          // Override authority to keep the old behavior.
1514
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1515
          .overrideAuthority(getAuthority());
1✔
1516
    }
1517

1518
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1519
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1520
    @Override
1521
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1522
        final String target, final ChannelCredentials channelCreds) {
1523
      checkNotNull(channelCreds, "channelCreds");
1✔
1524

1525
      final class ResolvingOobChannelBuilder
1526
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1527
        final ManagedChannelBuilder<?> delegate;
1528

1529
        ResolvingOobChannelBuilder() {
1✔
1530
          final ClientTransportFactory transportFactory;
1531
          CallCredentials callCredentials;
1532
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1533
            transportFactory = originalTransportFactory;
1✔
1534
            callCredentials = null;
1✔
1535
          } else {
1536
            SwapChannelCredentialsResult swapResult =
1✔
1537
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1538
            if (swapResult == null) {
1✔
1539
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1540
              return;
×
1541
            } else {
1542
              transportFactory = swapResult.transportFactory;
1✔
1543
              callCredentials = swapResult.callCredentials;
1✔
1544
            }
1545
          }
1546
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1547
              new ClientTransportFactoryBuilder() {
1✔
1548
                @Override
1549
                public ClientTransportFactory buildClientTransportFactory() {
1550
                  return transportFactory;
1✔
1551
                }
1552
              };
1553
          delegate = new ManagedChannelImplBuilder(
1✔
1554
              target,
1555
              channelCreds,
1556
              callCredentials,
1557
              transportFactoryBuilder,
1558
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1559
              .nameResolverRegistry(nameResolverRegistry);
1✔
1560
        }
1✔
1561

1562
        @Override
1563
        protected ManagedChannelBuilder<?> delegate() {
1564
          return delegate;
1✔
1565
        }
1566
      }
1567

1568
      checkState(!terminated, "Channel is terminated");
1✔
1569

1570
      @SuppressWarnings("deprecation")
1571
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1572

1573
      return builder
1✔
1574
          // TODO(zdapeng): executors should not outlive the parent channel.
1575
          .executor(executor)
1✔
1576
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1577
          .maxTraceEvents(maxTraceEvents)
1✔
1578
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1579
          .userAgent(userAgent);
1✔
1580
    }
1581

1582
    @Override
1583
    public ChannelCredentials getUnsafeChannelCredentials() {
1584
      if (originalChannelCreds == null) {
1✔
1585
        return new DefaultChannelCreds();
1✔
1586
      }
1587
      return originalChannelCreds;
×
1588
    }
1589

1590
    @Override
1591
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1592
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1593
    }
×
1594

1595
    @Override
1596
    public void updateOobChannelAddresses(ManagedChannel channel,
1597
        List<EquivalentAddressGroup> eag) {
1598
      checkArgument(channel instanceof OobChannel,
1✔
1599
          "channel must have been returned from createOobChannel");
1600
      ((OobChannel) channel).updateAddresses(eag);
1✔
1601
    }
1✔
1602

1603
    @Override
1604
    public String getAuthority() {
1605
      return ManagedChannelImpl.this.authority();
1✔
1606
    }
1607

1608
    @Override
1609
    public String getChannelTarget() {
1610
      return targetUri.toString();
1✔
1611
    }
1612

1613
    @Override
1614
    public SynchronizationContext getSynchronizationContext() {
1615
      return syncContext;
1✔
1616
    }
1617

1618
    @Override
1619
    public ScheduledExecutorService getScheduledExecutorService() {
1620
      return scheduledExecutor;
1✔
1621
    }
1622

1623
    @Override
1624
    public ChannelLogger getChannelLogger() {
1625
      return channelLogger;
1✔
1626
    }
1627

1628
    @Override
1629
    public NameResolver.Args getNameResolverArgs() {
1630
      return nameResolverArgs;
1✔
1631
    }
1632

1633
    @Override
1634
    public NameResolverRegistry getNameResolverRegistry() {
1635
      return nameResolverRegistry;
1✔
1636
    }
1637

1638
    @Override
1639
    public MetricRecorder getMetricRecorder() {
1640
      return metricRecorder;
1✔
1641
    }
1642

1643
    /**
1644
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1645
     */
1646
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1647
    //     channel creds.
1648
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1649
      @Override
1650
      public ChannelCredentials withoutBearerTokens() {
1651
        return this;
×
1652
      }
1653
    }
1654
  }
1655

1656
  final class NameResolverListener extends NameResolver.Listener2 {
1657
    final LbHelperImpl helper;
1658
    final NameResolver resolver;
1659

1660
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1661
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1662
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1663
    }
1✔
1664

1665
    @Override
1666
    public void onResult(final ResolutionResult resolutionResult) {
1667
      final class NamesResolved implements Runnable {
1✔
1668

1669
        @SuppressWarnings("ReferenceEquality")
1670
        @Override
1671
        public void run() {
1672
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1673
            return;
1✔
1674
          }
1675

1676
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1677
          channelLogger.log(
1✔
1678
              ChannelLogLevel.DEBUG,
1679
              "Resolved address: {0}, config={1}",
1680
              servers,
1681
              resolutionResult.getAttributes());
1✔
1682

1683
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1684
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1685
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1686
          }
1687

1688
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1689
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1690
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1691
          InternalConfigSelector resolvedConfigSelector =
1✔
1692
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1693
          ManagedChannelServiceConfig validServiceConfig =
1694
              configOrError != null && configOrError.getConfig() != null
1✔
1695
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1696
                  : null;
1✔
1697
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1698

1699
          ManagedChannelServiceConfig effectiveServiceConfig;
1700
          if (!lookUpServiceConfig) {
1✔
1701
            if (validServiceConfig != null) {
1✔
1702
              channelLogger.log(
1✔
1703
                  ChannelLogLevel.INFO,
1704
                  "Service config from name resolver discarded by channel settings");
1705
            }
1706
            effectiveServiceConfig =
1707
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1708
            if (resolvedConfigSelector != null) {
1✔
1709
              channelLogger.log(
1✔
1710
                  ChannelLogLevel.INFO,
1711
                  "Config selector from name resolver discarded by channel settings");
1712
            }
1713
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1714
          } else {
1715
            // Try to use config if returned from name resolver
1716
            // Otherwise, try to use the default config if available
1717
            if (validServiceConfig != null) {
1✔
1718
              effectiveServiceConfig = validServiceConfig;
1✔
1719
              if (resolvedConfigSelector != null) {
1✔
1720
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1721
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1722
                  channelLogger.log(
×
1723
                      ChannelLogLevel.DEBUG,
1724
                      "Method configs in service config will be discarded due to presence of"
1725
                          + "config-selector");
1726
                }
1727
              } else {
1728
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1729
              }
1730
            } else if (defaultServiceConfig != null) {
1✔
1731
              effectiveServiceConfig = defaultServiceConfig;
1✔
1732
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1733
              channelLogger.log(
1✔
1734
                  ChannelLogLevel.INFO,
1735
                  "Received no service config, using default service config");
1736
            } else if (serviceConfigError != null) {
1✔
1737
              if (!serviceConfigUpdated) {
1✔
1738
                // First DNS lookup has invalid service config, and cannot fall back to default
1739
                channelLogger.log(
1✔
1740
                    ChannelLogLevel.INFO,
1741
                    "Fallback to error due to invalid first service config without default config");
1742
                // This error could be an "inappropriate" control plane error that should not bleed
1743
                // through to client code using gRPC. We let them flow through here to the LB as
1744
                // we later check for these error codes when investigating pick results in
1745
                // GrpcUtil.getTransportFromPickResult().
1746
                onError(configOrError.getError());
1✔
1747
                if (resolutionResultListener != null) {
1✔
1748
                  resolutionResultListener.resolutionAttempted(configOrError.getError());
1✔
1749
                }
1750
                return;
1✔
1751
              } else {
1752
                effectiveServiceConfig = lastServiceConfig;
1✔
1753
              }
1754
            } else {
1755
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1756
              realChannel.updateConfigSelector(null);
1✔
1757
            }
1758
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1759
              channelLogger.log(
1✔
1760
                  ChannelLogLevel.INFO,
1761
                  "Service config changed{0}",
1762
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1763
              lastServiceConfig = effectiveServiceConfig;
1✔
1764
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1765
            }
1766

1767
            try {
1768
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1769
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1770
              //  lbNeedAddress is not deterministic
1771
              serviceConfigUpdated = true;
1✔
1772
            } catch (RuntimeException re) {
×
1773
              logger.log(
×
1774
                  Level.WARNING,
1775
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1776
                  re);
1777
            }
1✔
1778
          }
1779

1780
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1781
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1782
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1783
            Attributes.Builder attrBuilder =
1✔
1784
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1785
            Map<String, ?> healthCheckingConfig =
1✔
1786
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1787
            if (healthCheckingConfig != null) {
1✔
1788
              attrBuilder
1✔
1789
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1790
                  .build();
1✔
1791
            }
1792
            Attributes attributes = attrBuilder.build();
1✔
1793

1794
            Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1795
                ResolvedAddresses.newBuilder()
1✔
1796
                    .setAddresses(servers)
1✔
1797
                    .setAttributes(attributes)
1✔
1798
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1799
                    .build());
1✔
1800
            // If a listener is provided, let it know if the addresses were accepted.
1801
            if (resolutionResultListener != null) {
1✔
1802
              resolutionResultListener.resolutionAttempted(addressAcceptanceStatus);
1✔
1803
            }
1804
          }
1805
        }
1✔
1806
      }
1807

1808
      syncContext.execute(new NamesResolved());
1✔
1809
    }
1✔
1810

1811
    @Override
1812
    public void onError(final Status error) {
1813
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1814
      final class NameResolverErrorHandler implements Runnable {
1✔
1815
        @Override
1816
        public void run() {
1817
          handleErrorInSyncContext(error);
1✔
1818
        }
1✔
1819
      }
1820

1821
      syncContext.execute(new NameResolverErrorHandler());
1✔
1822
    }
1✔
1823

1824
    private void handleErrorInSyncContext(Status error) {
1825
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1826
          new Object[] {getLogId(), error});
1✔
1827
      realChannel.onConfigError();
1✔
1828
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1829
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1830
        lastResolutionState = ResolutionState.ERROR;
1✔
1831
      }
1832
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1833
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1834
        return;
1✔
1835
      }
1836

1837
      helper.lb.handleNameResolutionError(error);
1✔
1838
    }
1✔
1839
  }
1840

1841
  private final class SubchannelImpl extends AbstractSubchannel {
1842
    final CreateSubchannelArgs args;
1843
    final InternalLogId subchannelLogId;
1844
    final ChannelLoggerImpl subchannelLogger;
1845
    final ChannelTracer subchannelTracer;
1846
    List<EquivalentAddressGroup> addressGroups;
1847
    InternalSubchannel subchannel;
1848
    boolean started;
1849
    boolean shutdown;
1850
    ScheduledHandle delayedShutdownTask;
1851

1852
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1853
      checkNotNull(args, "args");
1✔
1854
      addressGroups = args.getAddresses();
1✔
1855
      if (authorityOverride != null) {
1✔
1856
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1857
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1858
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1859
      }
1860
      this.args = args;
1✔
1861
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1862
      subchannelTracer = new ChannelTracer(
1✔
1863
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1864
          "Subchannel for " + args.getAddresses());
1✔
1865
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1866
    }
1✔
1867

1868
    @Override
1869
    public void start(final SubchannelStateListener listener) {
1870
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1871
      checkState(!started, "already started");
1✔
1872
      checkState(!shutdown, "already shutdown");
1✔
1873
      checkState(!terminating, "Channel is being terminated");
1✔
1874
      started = true;
1✔
1875
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1876
        // All callbacks are run in syncContext
1877
        @Override
1878
        void onTerminated(InternalSubchannel is) {
1879
          subchannels.remove(is);
1✔
1880
          channelz.removeSubchannel(is);
1✔
1881
          maybeTerminateChannel();
1✔
1882
        }
1✔
1883

1884
        @Override
1885
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1886
          checkState(listener != null, "listener is null");
1✔
1887
          listener.onSubchannelState(newState);
1✔
1888
        }
1✔
1889

1890
        @Override
1891
        void onInUse(InternalSubchannel is) {
1892
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1893
        }
1✔
1894

1895
        @Override
1896
        void onNotInUse(InternalSubchannel is) {
1897
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1898
        }
1✔
1899
      }
1900

1901
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1902
          args.getAddresses(),
1✔
1903
          authority(),
1✔
1904
          userAgent,
1✔
1905
          backoffPolicyProvider,
1✔
1906
          transportFactory,
1✔
1907
          transportFactory.getScheduledExecutorService(),
1✔
1908
          stopwatchSupplier,
1✔
1909
          syncContext,
1910
          new ManagedInternalSubchannelCallback(),
1911
          channelz,
1✔
1912
          callTracerFactory.create(),
1✔
1913
          subchannelTracer,
1914
          subchannelLogId,
1915
          subchannelLogger,
1916
          transportFilters);
1✔
1917

1918
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1919
          .setDescription("Child Subchannel started")
1✔
1920
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1921
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1922
          .setSubchannelRef(internalSubchannel)
1✔
1923
          .build());
1✔
1924

1925
      this.subchannel = internalSubchannel;
1✔
1926
      channelz.addSubchannel(internalSubchannel);
1✔
1927
      subchannels.add(internalSubchannel);
1✔
1928
    }
1✔
1929

1930
    @Override
1931
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1932
      checkState(started, "not started");
1✔
1933
      return subchannel;
1✔
1934
    }
1935

1936
    @Override
1937
    public void shutdown() {
1938
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1939
      if (subchannel == null) {
1✔
1940
        // start() was not successful
1941
        shutdown = true;
×
1942
        return;
×
1943
      }
1944
      if (shutdown) {
1✔
1945
        if (terminating && delayedShutdownTask != null) {
1✔
1946
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1947
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1948
          delayedShutdownTask.cancel();
×
1949
          delayedShutdownTask = null;
×
1950
          // Will fall through to the subchannel.shutdown() at the end.
1951
        } else {
1952
          return;
1✔
1953
        }
1954
      } else {
1955
        shutdown = true;
1✔
1956
      }
1957
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1958
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1959
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1960
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1961
      // shutdown of Subchannel for a few seconds here.
1962
      //
1963
      // TODO(zhangkun83): consider a better approach
1964
      // (https://github.com/grpc/grpc-java/issues/2562).
1965
      if (!terminating) {
1✔
1966
        final class ShutdownSubchannel implements Runnable {
1✔
1967
          @Override
1968
          public void run() {
1969
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
1970
          }
1✔
1971
        }
1972

1973
        delayedShutdownTask = syncContext.schedule(
1✔
1974
            new LogExceptionRunnable(new ShutdownSubchannel()),
1975
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
1976
            transportFactory.getScheduledExecutorService());
1✔
1977
        return;
1✔
1978
      }
1979
      // When terminating == true, no more real streams will be created. It's safe and also
1980
      // desirable to shutdown timely.
1981
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
1982
    }
1✔
1983

1984
    @Override
1985
    public void requestConnection() {
1986
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1987
      checkState(started, "not started");
1✔
1988
      subchannel.obtainActiveTransport();
1✔
1989
    }
1✔
1990

1991
    @Override
1992
    public List<EquivalentAddressGroup> getAllAddresses() {
1993
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1994
      checkState(started, "not started");
1✔
1995
      return addressGroups;
1✔
1996
    }
1997

1998
    @Override
1999
    public Attributes getAttributes() {
2000
      return args.getAttributes();
1✔
2001
    }
2002

2003
    @Override
2004
    public String toString() {
2005
      return subchannelLogId.toString();
1✔
2006
    }
2007

2008
    @Override
2009
    public Channel asChannel() {
2010
      checkState(started, "not started");
1✔
2011
      return new SubchannelChannel(
1✔
2012
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2013
          transportFactory.getScheduledExecutorService(),
1✔
2014
          callTracerFactory.create(),
1✔
2015
          new AtomicReference<InternalConfigSelector>(null));
2016
    }
2017

2018
    @Override
2019
    public Object getInternalSubchannel() {
2020
      checkState(started, "Subchannel is not started");
1✔
2021
      return subchannel;
1✔
2022
    }
2023

2024
    @Override
2025
    public ChannelLogger getChannelLogger() {
2026
      return subchannelLogger;
1✔
2027
    }
2028

2029
    @Override
2030
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2031
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2032
      addressGroups = addrs;
1✔
2033
      if (authorityOverride != null) {
1✔
2034
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2035
      }
2036
      subchannel.updateAddresses(addrs);
1✔
2037
    }
1✔
2038

2039
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2040
        List<EquivalentAddressGroup> eags) {
2041
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2042
      for (EquivalentAddressGroup eag : eags) {
1✔
2043
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2044
            eag.getAddresses(),
1✔
2045
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2046
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2047
      }
1✔
2048
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2049
    }
2050
  }
2051

2052
  @Override
2053
  public String toString() {
2054
    return MoreObjects.toStringHelper(this)
1✔
2055
        .add("logId", logId.getId())
1✔
2056
        .add("target", target)
1✔
2057
        .toString();
1✔
2058
  }
2059

2060
  /**
2061
   * Called from syncContext.
2062
   */
2063
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2064
    @Override
2065
    public void transportShutdown(Status s) {
2066
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2067
    }
1✔
2068

2069
    @Override
2070
    public void transportReady() {
2071
      // Don't care
2072
    }
×
2073

2074
    @Override
2075
    public Attributes filterTransport(Attributes attributes) {
2076
      return attributes;
×
2077
    }
2078

2079
    @Override
2080
    public void transportInUse(final boolean inUse) {
2081
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2082
      if (inUse) {
1✔
2083
        // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
2084
        // subchannels is in use. But we should never be in idle mode when delayed transport is in
2085
        // use.
2086
        exitIdleMode();
1✔
2087
      }
2088
    }
1✔
2089

2090
    @Override
2091
    public void transportTerminated() {
2092
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2093
      terminating = true;
1✔
2094
      shutdownNameResolverAndLoadBalancer(false);
1✔
2095
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2096
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2097
      // here.
2098
      maybeShutdownNowSubchannels();
1✔
2099
      maybeTerminateChannel();
1✔
2100
    }
1✔
2101
  }
2102

2103
  /**
2104
   * Must be accessed from syncContext.
2105
   */
2106
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2107
    @Override
2108
    protected void handleInUse() {
2109
      exitIdleMode();
1✔
2110
    }
1✔
2111

2112
    @Override
2113
    protected void handleNotInUse() {
2114
      if (shutdown.get()) {
1✔
2115
        return;
1✔
2116
      }
2117
      rescheduleIdleTimer();
1✔
2118
    }
1✔
2119
  }
2120

2121
  /**
2122
   * Lazily request for Executor from an executor pool.
2123
   * Also act as an Executor directly to simply run a cmd
2124
   */
2125
  @VisibleForTesting
2126
  static final class ExecutorHolder implements Executor {
2127
    private final ObjectPool<? extends Executor> pool;
2128
    private Executor executor;
2129

2130
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2131
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2132
    }
1✔
2133

2134
    synchronized Executor getExecutor() {
2135
      if (executor == null) {
1✔
2136
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2137
      }
2138
      return executor;
1✔
2139
    }
2140

2141
    synchronized void release() {
2142
      if (executor != null) {
1✔
2143
        executor = pool.returnObject(executor);
1✔
2144
      }
2145
    }
1✔
2146

2147
    @Override
2148
    public void execute(Runnable command) {
2149
      getExecutor().execute(command);
1✔
2150
    }
1✔
2151
  }
2152

2153
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2154
    final ScheduledExecutorService delegate;
2155

2156
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2157
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2158
    }
1✔
2159

2160
    @Override
2161
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2162
      return delegate.schedule(callable, delay, unit);
×
2163
    }
2164

2165
    @Override
2166
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2167
      return delegate.schedule(cmd, delay, unit);
1✔
2168
    }
2169

2170
    @Override
2171
    public ScheduledFuture<?> scheduleAtFixedRate(
2172
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2173
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
1✔
2174
    }
2175

2176
    @Override
2177
    public ScheduledFuture<?> scheduleWithFixedDelay(
2178
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2179
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2180
    }
2181

2182
    @Override
2183
    public boolean awaitTermination(long timeout, TimeUnit unit)
2184
        throws InterruptedException {
2185
      return delegate.awaitTermination(timeout, unit);
×
2186
    }
2187

2188
    @Override
2189
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2190
        throws InterruptedException {
2191
      return delegate.invokeAll(tasks);
×
2192
    }
2193

2194
    @Override
2195
    public <T> List<Future<T>> invokeAll(
2196
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2197
        throws InterruptedException {
2198
      return delegate.invokeAll(tasks, timeout, unit);
×
2199
    }
2200

2201
    @Override
2202
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2203
        throws InterruptedException, ExecutionException {
2204
      return delegate.invokeAny(tasks);
×
2205
    }
2206

2207
    @Override
2208
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2209
        throws InterruptedException, ExecutionException, TimeoutException {
2210
      return delegate.invokeAny(tasks, timeout, unit);
×
2211
    }
2212

2213
    @Override
2214
    public boolean isShutdown() {
2215
      return delegate.isShutdown();
×
2216
    }
2217

2218
    @Override
2219
    public boolean isTerminated() {
2220
      return delegate.isTerminated();
×
2221
    }
2222

2223
    @Override
2224
    public void shutdown() {
2225
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2226
    }
2227

2228
    @Override
2229
    public List<Runnable> shutdownNow() {
2230
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2231
    }
2232

2233
    @Override
2234
    public <T> Future<T> submit(Callable<T> task) {
2235
      return delegate.submit(task);
×
2236
    }
2237

2238
    @Override
2239
    public Future<?> submit(Runnable task) {
2240
      return delegate.submit(task);
×
2241
    }
2242

2243
    @Override
2244
    public <T> Future<T> submit(Runnable task, T result) {
2245
      return delegate.submit(task, result);
×
2246
    }
2247

2248
    @Override
2249
    public void execute(Runnable command) {
2250
      delegate.execute(command);
×
2251
    }
×
2252
  }
2253

2254
  /**
2255
   * A ResolutionState indicates the status of last name resolution.
2256
   */
2257
  enum ResolutionState {
1✔
2258
    NO_RESOLUTION,
1✔
2259
    SUCCESS,
1✔
2260
    ERROR
1✔
2261
  }
2262
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc