• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19237

14 May 2024 06:37PM UTC coverage: 88.424% (-0.03%) from 88.45%
#19237

push

github

ejona86
core: Fully delegate picks to DelayedClientTransport

DelayedClientTransport already had to handle all the cases, so
ManagedChannelImpl picking was acting only as an optimization.
Optimizing DelayedClientTransport to avoid the lock when not queuing
makes ManagedChannelImpl picking entirely redundant, and allows us to
remove the duplicate race-handling logic.

This avoids double-picking when queuing, where ManagedChannelImpl does a
pick, decides to queue, and then DelayedClientTransport re-performs the
pick because it doesn't know which pick version was used. This was
noticed with RLS, which mutates state within the picker.

32035 of 36229 relevant lines covered (88.42%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.76
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import io.grpc.Attributes;
36
import io.grpc.CallCredentials;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ChannelCredentials;
40
import io.grpc.ChannelLogger;
41
import io.grpc.ChannelLogger.ChannelLogLevel;
42
import io.grpc.ClientCall;
43
import io.grpc.ClientInterceptor;
44
import io.grpc.ClientInterceptors;
45
import io.grpc.ClientStreamTracer;
46
import io.grpc.ClientTransportFilter;
47
import io.grpc.CompressorRegistry;
48
import io.grpc.ConnectivityState;
49
import io.grpc.ConnectivityStateInfo;
50
import io.grpc.Context;
51
import io.grpc.Deadline;
52
import io.grpc.DecompressorRegistry;
53
import io.grpc.EquivalentAddressGroup;
54
import io.grpc.ForwardingChannelBuilder2;
55
import io.grpc.ForwardingClientCall;
56
import io.grpc.Grpc;
57
import io.grpc.InternalChannelz;
58
import io.grpc.InternalChannelz.ChannelStats;
59
import io.grpc.InternalChannelz.ChannelTrace;
60
import io.grpc.InternalConfigSelector;
61
import io.grpc.InternalInstrumented;
62
import io.grpc.InternalLogId;
63
import io.grpc.InternalWithLogId;
64
import io.grpc.LoadBalancer;
65
import io.grpc.LoadBalancer.CreateSubchannelArgs;
66
import io.grpc.LoadBalancer.PickResult;
67
import io.grpc.LoadBalancer.PickSubchannelArgs;
68
import io.grpc.LoadBalancer.ResolvedAddresses;
69
import io.grpc.LoadBalancer.SubchannelPicker;
70
import io.grpc.LoadBalancer.SubchannelStateListener;
71
import io.grpc.ManagedChannel;
72
import io.grpc.ManagedChannelBuilder;
73
import io.grpc.Metadata;
74
import io.grpc.MethodDescriptor;
75
import io.grpc.MetricInstrumentRegistry;
76
import io.grpc.MetricRecorder;
77
import io.grpc.NameResolver;
78
import io.grpc.NameResolver.ConfigOrError;
79
import io.grpc.NameResolver.ResolutionResult;
80
import io.grpc.NameResolverProvider;
81
import io.grpc.NameResolverRegistry;
82
import io.grpc.ProxyDetector;
83
import io.grpc.Status;
84
import io.grpc.SynchronizationContext;
85
import io.grpc.SynchronizationContext.ScheduledHandle;
86
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
87
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
88
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
89
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
90
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
91
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
92
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
93
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
94
import io.grpc.internal.RetriableStream.Throttle;
95
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
96
import java.net.URI;
97
import java.util.ArrayList;
98
import java.util.Collection;
99
import java.util.Collections;
100
import java.util.HashSet;
101
import java.util.LinkedHashSet;
102
import java.util.List;
103
import java.util.Map;
104
import java.util.Set;
105
import java.util.concurrent.Callable;
106
import java.util.concurrent.CountDownLatch;
107
import java.util.concurrent.ExecutionException;
108
import java.util.concurrent.Executor;
109
import java.util.concurrent.Future;
110
import java.util.concurrent.ScheduledExecutorService;
111
import java.util.concurrent.ScheduledFuture;
112
import java.util.concurrent.TimeUnit;
113
import java.util.concurrent.TimeoutException;
114
import java.util.concurrent.atomic.AtomicBoolean;
115
import java.util.concurrent.atomic.AtomicReference;
116
import java.util.logging.Level;
117
import java.util.logging.Logger;
118
import javax.annotation.Nullable;
119
import javax.annotation.concurrent.GuardedBy;
120
import javax.annotation.concurrent.ThreadSafe;
121

122
/** A communication channel for making outgoing RPCs. */
123
@ThreadSafe
124
final class ManagedChannelImpl extends ManagedChannel implements
125
    InternalInstrumented<ChannelStats> {
126
  @VisibleForTesting
127
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
128

129
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
130

131
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
132

133
  @VisibleForTesting
134
  static final Status SHUTDOWN_NOW_STATUS =
1✔
135
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
136

137
  @VisibleForTesting
138
  static final Status SHUTDOWN_STATUS =
1✔
139
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
140

141
  @VisibleForTesting
142
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
143
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
144

145
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
146
      ManagedChannelServiceConfig.empty();
1✔
147
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
148
      new InternalConfigSelector() {
1✔
149
        @Override
150
        public Result selectConfig(PickSubchannelArgs args) {
151
          throw new IllegalStateException("Resolution is pending");
×
152
        }
153
      };
154
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
155
      new LoadBalancer.PickDetailsConsumer() {};
1✔
156

157
  private final InternalLogId logId;
158
  private final String target;
159
  @Nullable
160
  private final String authorityOverride;
161
  private final NameResolverRegistry nameResolverRegistry;
162
  private final URI targetUri;
163
  private final NameResolverProvider nameResolverProvider;
164
  private final NameResolver.Args nameResolverArgs;
165
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
166
  private final ClientTransportFactory originalTransportFactory;
167
  @Nullable
168
  private final ChannelCredentials originalChannelCreds;
169
  private final ClientTransportFactory transportFactory;
170
  private final ClientTransportFactory oobTransportFactory;
171
  private final RestrictedScheduledExecutor scheduledExecutor;
172
  private final Executor executor;
173
  private final ObjectPool<? extends Executor> executorPool;
174
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
175
  private final ExecutorHolder balancerRpcExecutorHolder;
176
  private final ExecutorHolder offloadExecutorHolder;
177
  private final TimeProvider timeProvider;
178
  private final int maxTraceEvents;
179

180
  @VisibleForTesting
1✔
181
  final SynchronizationContext syncContext = new SynchronizationContext(
182
      new Thread.UncaughtExceptionHandler() {
1✔
183
        @Override
184
        public void uncaughtException(Thread t, Throwable e) {
185
          logger.log(
1✔
186
              Level.SEVERE,
187
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
188
              e);
189
          panic(e);
1✔
190
        }
1✔
191
      });
192

193
  private boolean fullStreamDecompression;
194

195
  private final DecompressorRegistry decompressorRegistry;
196
  private final CompressorRegistry compressorRegistry;
197

198
  private final Supplier<Stopwatch> stopwatchSupplier;
199
  /** The timeout before entering idle mode. */
200
  private final long idleTimeoutMillis;
201

202
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
203
  private final BackoffPolicy.Provider backoffPolicyProvider;
204

205
  /**
206
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
207
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
208
   * {@link RealChannel}.
209
   */
210
  private final Channel interceptorChannel;
211

212
  private final List<ClientTransportFilter> transportFilters;
213
  @Nullable private final String userAgent;
214

215
  // Only null after channel is terminated. Must be assigned from the syncContext.
216
  private NameResolver nameResolver;
217

218
  // Must be accessed from the syncContext.
219
  private boolean nameResolverStarted;
220

221
  // null when channel is in idle mode.  Must be assigned from syncContext.
222
  @Nullable
223
  private LbHelperImpl lbHelper;
224

225
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
226
  // null if channel is in idle mode.
227
  @Nullable
228
  private volatile SubchannelPicker subchannelPicker;
229

230
  // Must be accessed from the syncContext
231
  private boolean panicMode;
232

233
  // Must be mutated from syncContext
234
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
235
  // switch to a ConcurrentHashMap.
236
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
237

238
  // Must be accessed from syncContext
239
  @Nullable
240
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
241
  private final Object pendingCallsInUseObject = new Object();
1✔
242

243
  // Must be mutated from syncContext
244
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
245

246
  // reprocess() must be run from syncContext
247
  private final DelayedClientTransport delayedTransport;
248
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
249
      = new UncommittedRetriableStreamsRegistry();
250

251
  // Shutdown states.
252
  //
253
  // Channel's shutdown process:
254
  // 1. shutdown(): stop accepting new calls from applications
255
  //   1a shutdown <- true
256
  //   1b subchannelPicker <- null
257
  //   1c delayedTransport.shutdown()
258
  // 2. delayedTransport terminated: stop stream-creation functionality
259
  //   2a terminating <- true
260
  //   2b loadBalancer.shutdown()
261
  //     * LoadBalancer will shutdown subchannels and OOB channels
262
  //   2c loadBalancer <- null
263
  //   2d nameResolver.shutdown()
264
  //   2e nameResolver <- null
265
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
266

267
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
268
  // Must only be mutated and read from syncContext
269
  private boolean shutdownNowed;
270
  // Must only be mutated from syncContext
271
  private boolean terminating;
272
  // Must be mutated from syncContext
273
  private volatile boolean terminated;
274
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
275

276
  private final CallTracer.Factory callTracerFactory;
277
  private final CallTracer channelCallTracer;
278
  private final ChannelTracer channelTracer;
279
  private final ChannelLogger channelLogger;
280
  private final InternalChannelz channelz;
281
  private final RealChannel realChannel;
282
  // Must be mutated and read from syncContext
283
  // a flag for doing channel tracing when flipped
284
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
285
  // Must be mutated and read from constructor or syncContext
286
  // used for channel tracing when value changed
287
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
288

289
  @Nullable
290
  private final ManagedChannelServiceConfig defaultServiceConfig;
291
  // Must be mutated and read from constructor or syncContext
292
  private boolean serviceConfigUpdated = false;
1✔
293
  private final boolean lookUpServiceConfig;
294

295
  // One instance per channel.
296
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
297

298
  private final long perRpcBufferLimit;
299
  private final long channelBufferLimit;
300

301
  // Temporary false flag that can skip the retry code path.
302
  private final boolean retryEnabled;
303

304
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
305

306
  // Called from syncContext
307
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
308
      new DelayedTransportListener();
309

310
  // Must be called from syncContext
311
  private void maybeShutdownNowSubchannels() {
312
    if (shutdownNowed) {
1✔
313
      for (InternalSubchannel subchannel : subchannels) {
1✔
314
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
315
      }
1✔
316
      for (OobChannel oobChannel : oobChannels) {
1✔
317
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
318
      }
1✔
319
    }
320
  }
1✔
321

322
  // Must be accessed from syncContext
323
  @VisibleForTesting
1✔
324
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
325

326
  @Override
327
  public ListenableFuture<ChannelStats> getStats() {
328
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
329
    final class StatsFetcher implements Runnable {
1✔
330
      @Override
331
      public void run() {
332
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
333
        channelCallTracer.updateBuilder(builder);
1✔
334
        channelTracer.updateBuilder(builder);
1✔
335
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
336
        List<InternalWithLogId> children = new ArrayList<>();
1✔
337
        children.addAll(subchannels);
1✔
338
        children.addAll(oobChannels);
1✔
339
        builder.setSubchannels(children);
1✔
340
        ret.set(builder.build());
1✔
341
      }
1✔
342
    }
343

344
    // subchannels and oobchannels can only be accessed from syncContext
345
    syncContext.execute(new StatsFetcher());
1✔
346
    return ret;
1✔
347
  }
348

349
  @Override
350
  public InternalLogId getLogId() {
351
    return logId;
1✔
352
  }
353

354
  // Run from syncContext
355
  private class IdleModeTimer implements Runnable {
1✔
356

357
    @Override
358
    public void run() {
359
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
360
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
361
      // subtle to change rapidly to resolve the channel panic. See #8714
362
      if (lbHelper == null) {
1✔
363
        return;
×
364
      }
365
      enterIdleMode();
1✔
366
    }
1✔
367
  }
368

369
  // Must be called from syncContext
370
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
371
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
372
    if (channelIsActive) {
1✔
373
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
374
      checkState(lbHelper != null, "lbHelper is null");
1✔
375
    }
376
    if (nameResolver != null) {
1✔
377
      nameResolver.shutdown();
1✔
378
      nameResolverStarted = false;
1✔
379
      if (channelIsActive) {
1✔
380
        nameResolver = getNameResolver(
1✔
381
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
382
      } else {
383
        nameResolver = null;
1✔
384
      }
385
    }
386
    if (lbHelper != null) {
1✔
387
      lbHelper.lb.shutdown();
1✔
388
      lbHelper = null;
1✔
389
    }
390
    subchannelPicker = null;
1✔
391
  }
1✔
392

393
  /**
394
   * Make the channel exit idle mode, if it's in it.
395
   *
396
   * <p>Must be called from syncContext
397
   */
398
  @VisibleForTesting
399
  void exitIdleMode() {
400
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
401
    if (shutdown.get() || panicMode) {
1✔
402
      return;
1✔
403
    }
404
    if (inUseStateAggregator.isInUse()) {
1✔
405
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
406
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
407
      cancelIdleTimer(false);
1✔
408
    } else {
409
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
410
      // isInUse() == false, in which case we still need to schedule the timer.
411
      rescheduleIdleTimer();
1✔
412
    }
413
    if (lbHelper != null) {
1✔
414
      return;
1✔
415
    }
416
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
417
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
418
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
419
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
420
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
421
    this.lbHelper = lbHelper;
1✔
422

423
    channelStateManager.gotoState(CONNECTING);
1✔
424
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
425
    nameResolver.start(listener);
1✔
426
    nameResolverStarted = true;
1✔
427
  }
1✔
428

429
  // Must be run from syncContext
430
  private void enterIdleMode() {
431
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
432
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
433
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
434
    // which are bugs.
435
    shutdownNameResolverAndLoadBalancer(true);
1✔
436
    delayedTransport.reprocess(null);
1✔
437
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
438
    channelStateManager.gotoState(IDLE);
1✔
439
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
440
    // transport to be holding some we need to exit idle mode to give these calls a chance to
441
    // be processed.
442
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
443
      exitIdleMode();
1✔
444
    }
445
  }
1✔
446

447
  // Must be run from syncContext
448
  private void cancelIdleTimer(boolean permanent) {
449
    idleTimer.cancel(permanent);
1✔
450
  }
1✔
451

452
  // Always run from syncContext
453
  private void rescheduleIdleTimer() {
454
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
455
      return;
1✔
456
    }
457
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
458
  }
1✔
459

460
  /**
461
   * Force name resolution refresh to happen immediately. Must be run
462
   * from syncContext.
463
   */
464
  private void refreshNameResolution() {
465
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
466
    if (nameResolverStarted) {
1✔
467
      nameResolver.refresh();
1✔
468
    }
469
  }
1✔
470

471
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
472
    volatile Throttle throttle;
473

474
    @Override
475
    public ClientStream newStream(
476
        final MethodDescriptor<?, ?> method,
477
        final CallOptions callOptions,
478
        final Metadata headers,
479
        final Context context) {
480
      // There is no need to reschedule the idle timer here. If the channel isn't shut down, either
481
      // the delayed transport or a real transport will go in-use and cancel the idle timer.
482
      if (!retryEnabled) {
1✔
483
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
484
            callOptions, headers, 0, /* isTransparentRetry= */ false);
485
        Context origContext = context.attach();
1✔
486
        try {
487
          return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
488
        } finally {
489
          context.detach(origContext);
1✔
490
        }
491
      } else {
492
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
493
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
494
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
495
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
496
          @SuppressWarnings("unchecked")
497
          RetryStream() {
1✔
498
            super(
1✔
499
                (MethodDescriptor<ReqT, ?>) method,
500
                headers,
501
                channelBufferUsed,
1✔
502
                perRpcBufferLimit,
1✔
503
                channelBufferLimit,
1✔
504
                getCallExecutor(callOptions),
1✔
505
                transportFactory.getScheduledExecutorService(),
1✔
506
                retryPolicy,
507
                hedgingPolicy,
508
                throttle);
509
          }
1✔
510

511
          @Override
512
          Status prestart() {
513
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
514
          }
515

516
          @Override
517
          void postCommit() {
518
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
519
          }
1✔
520

521
          @Override
522
          ClientStream newSubstream(
523
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
524
              boolean isTransparentRetry) {
525
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
526
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
527
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
528
            Context origContext = context.attach();
1✔
529
            try {
530
              return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
1✔
531
            } finally {
532
              context.detach(origContext);
1✔
533
            }
534
          }
535
        }
536

537
        return new RetryStream<>();
1✔
538
      }
539
    }
540
  }
541

542
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
543

544
  private final Rescheduler idleTimer;
545
  private final MetricRecorder metricRecorder;
546

547
  ManagedChannelImpl(
548
      ManagedChannelImplBuilder builder,
549
      ClientTransportFactory clientTransportFactory,
550
      URI targetUri,
551
      NameResolverProvider nameResolverProvider,
552
      BackoffPolicy.Provider backoffPolicyProvider,
553
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
554
      Supplier<Stopwatch> stopwatchSupplier,
555
      List<ClientInterceptor> interceptors,
556
      final TimeProvider timeProvider) {
1✔
557
    this.target = checkNotNull(builder.target, "target");
1✔
558
    this.logId = InternalLogId.allocate("Channel", target);
1✔
559
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
560
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
561
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
562
    this.originalChannelCreds = builder.channelCredentials;
1✔
563
    this.originalTransportFactory = clientTransportFactory;
1✔
564
    this.offloadExecutorHolder =
1✔
565
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
566
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
567
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
568
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
569
        clientTransportFactory, null, this.offloadExecutorHolder);
570
    this.scheduledExecutor =
1✔
571
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
572
    maxTraceEvents = builder.maxTraceEvents;
1✔
573
    channelTracer = new ChannelTracer(
1✔
574
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
575
        "Channel for '" + target + "'");
576
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
577
    ProxyDetector proxyDetector =
578
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
579
    this.retryEnabled = builder.retryEnabled;
1✔
580
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
581
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
582
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
583
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
584
    ScParser serviceConfigParser =
1✔
585
        new ScParser(
586
            retryEnabled,
587
            builder.maxRetryAttempts,
588
            builder.maxHedgedAttempts,
589
            loadBalancerFactory);
590
    this.authorityOverride = builder.authorityOverride;
1✔
591
    this.nameResolverArgs =
1✔
592
        NameResolver.Args.newBuilder()
1✔
593
            .setDefaultPort(builder.getDefaultPort())
1✔
594
            .setProxyDetector(proxyDetector)
1✔
595
            .setSynchronizationContext(syncContext)
1✔
596
            .setScheduledExecutorService(scheduledExecutor)
1✔
597
            .setServiceConfigParser(serviceConfigParser)
1✔
598
            .setChannelLogger(channelLogger)
1✔
599
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
600
            .setOverrideAuthority(this.authorityOverride)
1✔
601
            .build();
1✔
602
    this.nameResolver = getNameResolver(
1✔
603
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
604
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
605
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
606
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
607
    this.delayedTransport.start(delayedTransportListener);
1✔
608
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
609

610
    if (builder.defaultServiceConfig != null) {
1✔
611
      ConfigOrError parsedDefaultServiceConfig =
1✔
612
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
613
      checkState(
1✔
614
          parsedDefaultServiceConfig.getError() == null,
1✔
615
          "Default config is invalid: %s",
616
          parsedDefaultServiceConfig.getError());
1✔
617
      this.defaultServiceConfig =
1✔
618
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
619
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
620
    } else {
1✔
621
      this.defaultServiceConfig = null;
1✔
622
    }
623
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
624
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
625
    Channel channel = realChannel;
1✔
626
    if (builder.binlog != null) {
1✔
627
      channel = builder.binlog.wrapChannel(channel);
1✔
628
    }
629
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
630
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
631
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
632
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
633
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
634
    } else {
635
      checkArgument(
1✔
636
          builder.idleTimeoutMillis
637
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
638
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
639
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
640
    }
641

642
    idleTimer = new Rescheduler(
1✔
643
        new IdleModeTimer(),
644
        syncContext,
645
        transportFactory.getScheduledExecutorService(),
1✔
646
        stopwatchSupplier.get());
1✔
647
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
648
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
649
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
650
    this.userAgent = builder.userAgent;
1✔
651

652
    this.channelBufferLimit = builder.retryBufferSize;
1✔
653
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
654
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
655
      @Override
656
      public CallTracer create() {
657
        return new CallTracer(timeProvider);
1✔
658
      }
659
    }
660

661
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
662
    channelCallTracer = callTracerFactory.create();
1✔
663
    this.channelz = checkNotNull(builder.channelz);
1✔
664
    channelz.addRootChannel(this);
1✔
665

666
    if (!lookUpServiceConfig) {
1✔
667
      if (defaultServiceConfig != null) {
1✔
668
        channelLogger.log(
1✔
669
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
670
      }
671
      serviceConfigUpdated = true;
1✔
672
    }
673
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
674
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
675
  }
1✔
676

677
  @VisibleForTesting
678
  static NameResolver getNameResolver(
679
      URI targetUri, @Nullable final String overrideAuthority,
680
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
681
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
682
    if (resolver == null) {
1✔
683
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
684
    }
685

686
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
687
    // TODO: After a transition period, all NameResolver implementations that need retry should use
688
    //       RetryingNameResolver directly and this step can be removed.
689
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
690
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
691
              nameResolverArgs.getScheduledExecutorService(),
1✔
692
              nameResolverArgs.getSynchronizationContext()),
1✔
693
          nameResolverArgs.getSynchronizationContext());
1✔
694

695
    if (overrideAuthority == null) {
1✔
696
      return usedNameResolver;
1✔
697
    }
698

699
    return new ForwardingNameResolver(usedNameResolver) {
1✔
700
      @Override
701
      public String getServiceAuthority() {
702
        return overrideAuthority;
1✔
703
      }
704
    };
705
  }
706

707
  @VisibleForTesting
708
  InternalConfigSelector getConfigSelector() {
709
    return realChannel.configSelector.get();
1✔
710
  }
711

712
  /**
713
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
714
   * cancelled.
715
   */
716
  @Override
717
  public ManagedChannelImpl shutdown() {
718
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
719
    if (!shutdown.compareAndSet(false, true)) {
1✔
720
      return this;
1✔
721
    }
722
    final class Shutdown implements Runnable {
1✔
723
      @Override
724
      public void run() {
725
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
726
        channelStateManager.gotoState(SHUTDOWN);
1✔
727
      }
1✔
728
    }
729

730
    syncContext.execute(new Shutdown());
1✔
731
    realChannel.shutdown();
1✔
732
    final class CancelIdleTimer implements Runnable {
1✔
733
      @Override
734
      public void run() {
735
        cancelIdleTimer(/* permanent= */ true);
1✔
736
      }
1✔
737
    }
738

739
    syncContext.execute(new CancelIdleTimer());
1✔
740
    return this;
1✔
741
  }
742

743
  /**
744
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
745
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
746
   * return {@code false} immediately after this method returns.
747
   */
748
  @Override
749
  public ManagedChannelImpl shutdownNow() {
750
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
751
    shutdown();
1✔
752
    realChannel.shutdownNow();
1✔
753
    final class ShutdownNow implements Runnable {
1✔
754
      @Override
755
      public void run() {
756
        if (shutdownNowed) {
1✔
757
          return;
1✔
758
        }
759
        shutdownNowed = true;
1✔
760
        maybeShutdownNowSubchannels();
1✔
761
      }
1✔
762
    }
763

764
    syncContext.execute(new ShutdownNow());
1✔
765
    return this;
1✔
766
  }
767

768
  // Called from syncContext
769
  @VisibleForTesting
770
  void panic(final Throwable t) {
771
    if (panicMode) {
1✔
772
      // Preserve the first panic information
773
      return;
×
774
    }
775
    panicMode = true;
1✔
776
    cancelIdleTimer(/* permanent= */ true);
1✔
777
    shutdownNameResolverAndLoadBalancer(false);
1✔
778
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
779
      private final PickResult panicPickResult =
1✔
780
          PickResult.withDrop(
1✔
781
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
782

783
      @Override
784
      public PickResult pickSubchannel(PickSubchannelArgs args) {
785
        return panicPickResult;
1✔
786
      }
787

788
      @Override
789
      public String toString() {
790
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
791
            .add("panicPickResult", panicPickResult)
×
792
            .toString();
×
793
      }
794
    }
795

796
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
797
    realChannel.updateConfigSelector(null);
1✔
798
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
799
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
800
  }
1✔
801

802
  @VisibleForTesting
803
  boolean isInPanicMode() {
804
    return panicMode;
1✔
805
  }
806

807
  // Called from syncContext
808
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
809
    subchannelPicker = newPicker;
1✔
810
    delayedTransport.reprocess(newPicker);
1✔
811
  }
1✔
812

813
  @Override
814
  public boolean isShutdown() {
815
    return shutdown.get();
1✔
816
  }
817

818
  @Override
819
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
820
    return terminatedLatch.await(timeout, unit);
1✔
821
  }
822

823
  @Override
824
  public boolean isTerminated() {
825
    return terminated;
1✔
826
  }
827

828
  /*
829
   * Creates a new outgoing call on the channel.
830
   */
831
  @Override
832
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
833
      CallOptions callOptions) {
834
    return interceptorChannel.newCall(method, callOptions);
1✔
835
  }
836

837
  @Override
838
  public String authority() {
839
    return interceptorChannel.authority();
1✔
840
  }
841

842
  private Executor getCallExecutor(CallOptions callOptions) {
843
    Executor executor = callOptions.getExecutor();
1✔
844
    if (executor == null) {
1✔
845
      executor = this.executor;
1✔
846
    }
847
    return executor;
1✔
848
  }
849

850
  private class RealChannel extends Channel {
851
    // Reference to null if no config selector is available from resolution result
852
    // Reference must be set() from syncContext
853
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
854
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
855
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
856
    // same target, the new instance must have the same value.
857
    private final String authority;
858

859
    private final Channel clientCallImplChannel = new Channel() {
1✔
860
      @Override
861
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
862
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
863
        return new ClientCallImpl<>(
1✔
864
            method,
865
            getCallExecutor(callOptions),
1✔
866
            callOptions,
867
            transportProvider,
1✔
868
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
869
            channelCallTracer,
1✔
870
            null)
871
            .setFullStreamDecompression(fullStreamDecompression)
1✔
872
            .setDecompressorRegistry(decompressorRegistry)
1✔
873
            .setCompressorRegistry(compressorRegistry);
1✔
874
      }
875

876
      @Override
877
      public String authority() {
878
        return authority;
×
879
      }
880
    };
881

882
    private RealChannel(String authority) {
1✔
883
      this.authority =  checkNotNull(authority, "authority");
1✔
884
    }
1✔
885

886
    @Override
887
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
888
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
889
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
890
        return newClientCall(method, callOptions);
1✔
891
      }
892
      syncContext.execute(new Runnable() {
1✔
893
        @Override
894
        public void run() {
895
          exitIdleMode();
1✔
896
        }
1✔
897
      });
898
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
899
        // This is an optimization for the case (typically with InProcessTransport) when name
900
        // resolution result is immediately available at this point. Otherwise, some users'
901
        // tests might observe slight behavior difference from earlier grpc versions.
902
        return newClientCall(method, callOptions);
1✔
903
      }
904
      if (shutdown.get()) {
1✔
905
        // Return a failing ClientCall.
906
        return new ClientCall<ReqT, RespT>() {
×
907
          @Override
908
          public void start(Listener<RespT> responseListener, Metadata headers) {
909
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
910
          }
×
911

912
          @Override public void request(int numMessages) {}
×
913

914
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
915

916
          @Override public void halfClose() {}
×
917

918
          @Override public void sendMessage(ReqT message) {}
×
919
        };
920
      }
921
      Context context = Context.current();
1✔
922
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
923
      syncContext.execute(new Runnable() {
1✔
924
        @Override
925
        public void run() {
926
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
927
            if (pendingCalls == null) {
1✔
928
              pendingCalls = new LinkedHashSet<>();
1✔
929
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
930
            }
931
            pendingCalls.add(pendingCall);
1✔
932
          } else {
933
            pendingCall.reprocess();
1✔
934
          }
935
        }
1✔
936
      });
937
      return pendingCall;
1✔
938
    }
939

940
    // Must run in SynchronizationContext.
941
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
942
      InternalConfigSelector prevConfig = configSelector.get();
1✔
943
      configSelector.set(config);
1✔
944
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
945
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
946
          pendingCall.reprocess();
1✔
947
        }
1✔
948
      }
949
    }
1✔
950

951
    // Must run in SynchronizationContext.
952
    void onConfigError() {
953
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
954
        updateConfigSelector(null);
1✔
955
      }
956
    }
1✔
957

958
    void shutdown() {
959
      final class RealChannelShutdown implements Runnable {
1✔
960
        @Override
961
        public void run() {
962
          if (pendingCalls == null) {
1✔
963
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
964
              configSelector.set(null);
1✔
965
            }
966
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
967
          }
968
        }
1✔
969
      }
970

971
      syncContext.execute(new RealChannelShutdown());
1✔
972
    }
1✔
973

974
    void shutdownNow() {
975
      final class RealChannelShutdownNow implements Runnable {
1✔
976
        @Override
977
        public void run() {
978
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
979
            configSelector.set(null);
1✔
980
          }
981
          if (pendingCalls != null) {
1✔
982
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
983
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
984
            }
1✔
985
          }
986
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
987
        }
1✔
988
      }
989

990
      syncContext.execute(new RealChannelShutdownNow());
1✔
991
    }
1✔
992

993
    @Override
994
    public String authority() {
995
      return authority;
1✔
996
    }
997

998
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
999
      final Context context;
1000
      final MethodDescriptor<ReqT, RespT> method;
1001
      final CallOptions callOptions;
1002
      private final long callCreationTime;
1003

1004
      PendingCall(
1005
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1006
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1007
        this.context = context;
1✔
1008
        this.method = method;
1✔
1009
        this.callOptions = callOptions;
1✔
1010
        this.callCreationTime = ticker.nanoTime();
1✔
1011
      }
1✔
1012

1013
      /** Called when it's ready to create a real call and reprocess the pending call. */
1014
      void reprocess() {
1015
        ClientCall<ReqT, RespT> realCall;
1016
        Context previous = context.attach();
1✔
1017
        try {
1018
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1019
              ticker.nanoTime() - callCreationTime);
1✔
1020
          realCall = newClientCall(method, delayResolutionOption);
1✔
1021
        } finally {
1022
          context.detach(previous);
1✔
1023
        }
1024
        Runnable toRun = setCall(realCall);
1✔
1025
        if (toRun == null) {
1✔
1026
          syncContext.execute(new PendingCallRemoval());
1✔
1027
        } else {
1028
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1029
            @Override
1030
            public void run() {
1031
              toRun.run();
1✔
1032
              syncContext.execute(new PendingCallRemoval());
1✔
1033
            }
1✔
1034
          });
1035
        }
1036
      }
1✔
1037

1038
      @Override
1039
      protected void callCancelled() {
1040
        super.callCancelled();
1✔
1041
        syncContext.execute(new PendingCallRemoval());
1✔
1042
      }
1✔
1043

1044
      final class PendingCallRemoval implements Runnable {
1✔
1045
        @Override
1046
        public void run() {
1047
          if (pendingCalls != null) {
1✔
1048
            pendingCalls.remove(PendingCall.this);
1✔
1049
            if (pendingCalls.isEmpty()) {
1✔
1050
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1051
              pendingCalls = null;
1✔
1052
              if (shutdown.get()) {
1✔
1053
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1054
              }
1055
            }
1056
          }
1057
        }
1✔
1058
      }
1059
    }
1060

1061
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1062
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1063
      InternalConfigSelector selector = configSelector.get();
1✔
1064
      if (selector == null) {
1✔
1065
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1066
      }
1067
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1068
        MethodInfo methodInfo =
1✔
1069
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1070
        if (methodInfo != null) {
1✔
1071
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1072
        }
1073
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1074
      }
1075
      return new ConfigSelectingClientCall<>(
1✔
1076
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1077
    }
1078
  }
1079

1080
  /**
1081
   * A client call for a given channel that applies a given config selector when it starts.
1082
   */
1083
  static final class ConfigSelectingClientCall<ReqT, RespT>
1084
      extends ForwardingClientCall<ReqT, RespT> {
1085

1086
    private final InternalConfigSelector configSelector;
1087
    private final Channel channel;
1088
    private final Executor callExecutor;
1089
    private final MethodDescriptor<ReqT, RespT> method;
1090
    private final Context context;
1091
    private CallOptions callOptions;
1092

1093
    private ClientCall<ReqT, RespT> delegate;
1094

1095
    ConfigSelectingClientCall(
1096
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1097
        MethodDescriptor<ReqT, RespT> method,
1098
        CallOptions callOptions) {
1✔
1099
      this.configSelector = configSelector;
1✔
1100
      this.channel = channel;
1✔
1101
      this.method = method;
1✔
1102
      this.callExecutor =
1✔
1103
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1104
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1105
      this.context = Context.current();
1✔
1106
    }
1✔
1107

1108
    @Override
1109
    protected ClientCall<ReqT, RespT> delegate() {
1110
      return delegate;
1✔
1111
    }
1112

1113
    @SuppressWarnings("unchecked")
1114
    @Override
1115
    public void start(Listener<RespT> observer, Metadata headers) {
1116
      PickSubchannelArgs args =
1✔
1117
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1118
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1119
      Status status = result.getStatus();
1✔
1120
      if (!status.isOk()) {
1✔
1121
        executeCloseObserverInContext(observer,
1✔
1122
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1123
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1124
        return;
1✔
1125
      }
1126
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1127
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1128
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1129
      if (methodInfo != null) {
1✔
1130
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1131
      }
1132
      if (interceptor != null) {
1✔
1133
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1134
      } else {
1135
        delegate = channel.newCall(method, callOptions);
×
1136
      }
1137
      delegate.start(observer, headers);
1✔
1138
    }
1✔
1139

1140
    private void executeCloseObserverInContext(
1141
        final Listener<RespT> observer, final Status status) {
1142
      class CloseInContext extends ContextRunnable {
1143
        CloseInContext() {
1✔
1144
          super(context);
1✔
1145
        }
1✔
1146

1147
        @Override
1148
        public void runInContext() {
1149
          observer.onClose(status, new Metadata());
1✔
1150
        }
1✔
1151
      }
1152

1153
      callExecutor.execute(new CloseInContext());
1✔
1154
    }
1✔
1155

1156
    @Override
1157
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1158
      if (delegate != null) {
×
1159
        delegate.cancel(message, cause);
×
1160
      }
1161
    }
×
1162
  }
1163

1164
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1165
    @Override
1166
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1167

1168
    @Override
1169
    public void request(int numMessages) {}
1✔
1170

1171
    @Override
1172
    public void cancel(String message, Throwable cause) {}
×
1173

1174
    @Override
1175
    public void halfClose() {}
×
1176

1177
    @Override
1178
    public void sendMessage(Object message) {}
×
1179

1180
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1181
    @Override
1182
    public boolean isReady() {
1183
      return false;
×
1184
    }
1185
  };
1186

1187
  /**
1188
   * Terminate the channel if termination conditions are met.
1189
   */
1190
  // Must be run from syncContext
1191
  private void maybeTerminateChannel() {
1192
    if (terminated) {
1✔
1193
      return;
×
1194
    }
1195
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1196
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1197
      channelz.removeRootChannel(this);
1✔
1198
      executorPool.returnObject(executor);
1✔
1199
      balancerRpcExecutorHolder.release();
1✔
1200
      offloadExecutorHolder.release();
1✔
1201
      // Release the transport factory so that it can deallocate any resources.
1202
      transportFactory.close();
1✔
1203

1204
      terminated = true;
1✔
1205
      terminatedLatch.countDown();
1✔
1206
    }
1207
  }
1✔
1208

1209
  // Must be called from syncContext
1210
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1211
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1212
      refreshNameResolution();
1✔
1213
    }
1214
  }
1✔
1215

1216
  @Override
1217
  @SuppressWarnings("deprecation")
1218
  public ConnectivityState getState(boolean requestConnection) {
1219
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1220
    if (requestConnection && savedChannelState == IDLE) {
1✔
1221
      final class RequestConnection implements Runnable {
1✔
1222
        @Override
1223
        public void run() {
1224
          exitIdleMode();
1✔
1225
          if (subchannelPicker != null) {
1✔
1226
            subchannelPicker.requestConnection();
1✔
1227
          }
1228
          if (lbHelper != null) {
1✔
1229
            lbHelper.lb.requestConnection();
1✔
1230
          }
1231
        }
1✔
1232
      }
1233

1234
      syncContext.execute(new RequestConnection());
1✔
1235
    }
1236
    return savedChannelState;
1✔
1237
  }
1238

1239
  @Override
1240
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1241
    final class NotifyStateChanged implements Runnable {
1✔
1242
      @Override
1243
      public void run() {
1244
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1245
      }
1✔
1246
    }
1247

1248
    syncContext.execute(new NotifyStateChanged());
1✔
1249
  }
1✔
1250

1251
  @Override
1252
  public void resetConnectBackoff() {
1253
    final class ResetConnectBackoff implements Runnable {
1✔
1254
      @Override
1255
      public void run() {
1256
        if (shutdown.get()) {
1✔
1257
          return;
1✔
1258
        }
1259
        if (nameResolverStarted) {
1✔
1260
          refreshNameResolution();
1✔
1261
        }
1262
        for (InternalSubchannel subchannel : subchannels) {
1✔
1263
          subchannel.resetConnectBackoff();
1✔
1264
        }
1✔
1265
        for (OobChannel oobChannel : oobChannels) {
1✔
1266
          oobChannel.resetConnectBackoff();
×
1267
        }
×
1268
      }
1✔
1269
    }
1270

1271
    syncContext.execute(new ResetConnectBackoff());
1✔
1272
  }
1✔
1273

1274
  @Override
1275
  public void enterIdle() {
1276
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1277
      @Override
1278
      public void run() {
1279
        if (shutdown.get() || lbHelper == null) {
1✔
1280
          return;
1✔
1281
        }
1282
        cancelIdleTimer(/* permanent= */ false);
1✔
1283
        enterIdleMode();
1✔
1284
      }
1✔
1285
    }
1286

1287
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1288
  }
1✔
1289

1290
  /**
1291
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1292
   * backoff.
1293
   */
1294
  private final class UncommittedRetriableStreamsRegistry {
1✔
1295
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1296
    // it's worthwhile to look for a lock-free approach.
1297
    final Object lock = new Object();
1✔
1298

1299
    @GuardedBy("lock")
1✔
1300
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1301

1302
    @GuardedBy("lock")
1303
    Status shutdownStatus;
1304

1305
    void onShutdown(Status reason) {
1306
      boolean shouldShutdownDelayedTransport = false;
1✔
1307
      synchronized (lock) {
1✔
1308
        if (shutdownStatus != null) {
1✔
1309
          return;
1✔
1310
        }
1311
        shutdownStatus = reason;
1✔
1312
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1313
        // retriable streams, which may be in backoff and not using any transport, are already
1314
        // started RPCs.
1315
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1316
          shouldShutdownDelayedTransport = true;
1✔
1317
        }
1318
      }
1✔
1319

1320
      if (shouldShutdownDelayedTransport) {
1✔
1321
        delayedTransport.shutdown(reason);
1✔
1322
      }
1323
    }
1✔
1324

1325
    void onShutdownNow(Status reason) {
1326
      onShutdown(reason);
1✔
1327
      Collection<ClientStream> streams;
1328

1329
      synchronized (lock) {
1✔
1330
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1331
      }
1✔
1332

1333
      for (ClientStream stream : streams) {
1✔
1334
        stream.cancel(reason);
1✔
1335
      }
1✔
1336
      delayedTransport.shutdownNow(reason);
1✔
1337
    }
1✔
1338

1339
    /**
1340
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1341
     * shutdown Status.
1342
     */
1343
    @Nullable
1344
    Status add(RetriableStream<?> retriableStream) {
1345
      synchronized (lock) {
1✔
1346
        if (shutdownStatus != null) {
1✔
1347
          return shutdownStatus;
1✔
1348
        }
1349
        uncommittedRetriableStreams.add(retriableStream);
1✔
1350
        return null;
1✔
1351
      }
1352
    }
1353

1354
    void remove(RetriableStream<?> retriableStream) {
1355
      Status shutdownStatusCopy = null;
1✔
1356

1357
      synchronized (lock) {
1✔
1358
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1359
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1360
          shutdownStatusCopy = shutdownStatus;
1✔
1361
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1362
          // hashmap.
1363
          uncommittedRetriableStreams = new HashSet<>();
1✔
1364
        }
1365
      }
1✔
1366

1367
      if (shutdownStatusCopy != null) {
1✔
1368
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1369
      }
1370
    }
1✔
1371
  }
1372

1373
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1374
    AutoConfiguredLoadBalancer lb;
1375

1376
    @Override
1377
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1378
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1379
      // No new subchannel should be created after load balancer has been shutdown.
1380
      checkState(!terminating, "Channel is being terminated");
1✔
1381
      return new SubchannelImpl(args);
1✔
1382
    }
1383

1384
    @Override
1385
    public void updateBalancingState(
1386
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1387
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1388
      checkNotNull(newState, "newState");
1✔
1389
      checkNotNull(newPicker, "newPicker");
1✔
1390
      final class UpdateBalancingState implements Runnable {
1✔
1391
        @Override
1392
        public void run() {
1393
          if (LbHelperImpl.this != lbHelper) {
1✔
1394
            return;
1✔
1395
          }
1396
          updateSubchannelPicker(newPicker);
1✔
1397
          // It's not appropriate to report SHUTDOWN state from lb.
1398
          // Ignore the case of newState == SHUTDOWN for now.
1399
          if (newState != SHUTDOWN) {
1✔
1400
            channelLogger.log(
1✔
1401
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1402
            channelStateManager.gotoState(newState);
1✔
1403
          }
1404
        }
1✔
1405
      }
1406

1407
      syncContext.execute(new UpdateBalancingState());
1✔
1408
    }
1✔
1409

1410
    @Override
1411
    public void refreshNameResolution() {
1412
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1413
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1414
        @Override
1415
        public void run() {
1416
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1417
        }
1✔
1418
      }
1419

1420
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1421
    }
1✔
1422

1423
    @Override
1424
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1425
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1426
    }
1427

1428
    @Override
1429
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1430
        String authority) {
1431
      // TODO(ejona): can we be even stricter? Like terminating?
1432
      checkState(!terminated, "Channel is terminated");
1✔
1433
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1434
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1435
      InternalLogId subchannelLogId =
1✔
1436
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1437
      ChannelTracer oobChannelTracer =
1✔
1438
          new ChannelTracer(
1439
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1440
              "OobChannel for " + addressGroup);
1441
      final OobChannel oobChannel = new OobChannel(
1✔
1442
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1443
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1444
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1445
          .setDescription("Child OobChannel created")
1✔
1446
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1447
          .setTimestampNanos(oobChannelCreationTime)
1✔
1448
          .setChannelRef(oobChannel)
1✔
1449
          .build());
1✔
1450
      ChannelTracer subchannelTracer =
1✔
1451
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1452
              "Subchannel for " + addressGroup);
1453
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1454
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1455
        @Override
1456
        void onTerminated(InternalSubchannel is) {
1457
          oobChannels.remove(oobChannel);
1✔
1458
          channelz.removeSubchannel(is);
1✔
1459
          oobChannel.handleSubchannelTerminated();
1✔
1460
          maybeTerminateChannel();
1✔
1461
        }
1✔
1462

1463
        @Override
1464
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1465
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1466
          //  state and refresh name resolution if necessary.
1467
          handleInternalSubchannelState(newState);
1✔
1468
          oobChannel.handleSubchannelStateChange(newState);
1✔
1469
        }
1✔
1470
      }
1471

1472
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1473
          addressGroup,
1474
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1475
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1476
          // All callback methods are run from syncContext
1477
          new ManagedOobChannelCallback(),
1478
          channelz,
1✔
1479
          callTracerFactory.create(),
1✔
1480
          subchannelTracer,
1481
          subchannelLogId,
1482
          subchannelLogger,
1483
          transportFilters);
1✔
1484
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1485
          .setDescription("Child Subchannel created")
1✔
1486
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1487
          .setTimestampNanos(oobChannelCreationTime)
1✔
1488
          .setSubchannelRef(internalSubchannel)
1✔
1489
          .build());
1✔
1490
      channelz.addSubchannel(oobChannel);
1✔
1491
      channelz.addSubchannel(internalSubchannel);
1✔
1492
      oobChannel.setSubchannel(internalSubchannel);
1✔
1493
      final class AddOobChannel implements Runnable {
1✔
1494
        @Override
1495
        public void run() {
1496
          if (terminating) {
1✔
1497
            oobChannel.shutdown();
×
1498
          }
1499
          if (!terminated) {
1✔
1500
            // If channel has not terminated, it will track the subchannel and block termination
1501
            // for it.
1502
            oobChannels.add(oobChannel);
1✔
1503
          }
1504
        }
1✔
1505
      }
1506

1507
      syncContext.execute(new AddOobChannel());
1✔
1508
      return oobChannel;
1✔
1509
    }
1510

1511
    @Deprecated
1512
    @Override
1513
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1514
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1515
          // Override authority to keep the old behavior.
1516
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1517
          .overrideAuthority(getAuthority());
1✔
1518
    }
1519

1520
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1521
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1522
    @Override
1523
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1524
        final String target, final ChannelCredentials channelCreds) {
1525
      checkNotNull(channelCreds, "channelCreds");
1✔
1526

1527
      final class ResolvingOobChannelBuilder
1528
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1529
        final ManagedChannelBuilder<?> delegate;
1530

1531
        ResolvingOobChannelBuilder() {
1✔
1532
          final ClientTransportFactory transportFactory;
1533
          CallCredentials callCredentials;
1534
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1535
            transportFactory = originalTransportFactory;
1✔
1536
            callCredentials = null;
1✔
1537
          } else {
1538
            SwapChannelCredentialsResult swapResult =
1✔
1539
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1540
            if (swapResult == null) {
1✔
1541
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1542
              return;
×
1543
            } else {
1544
              transportFactory = swapResult.transportFactory;
1✔
1545
              callCredentials = swapResult.callCredentials;
1✔
1546
            }
1547
          }
1548
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1549
              new ClientTransportFactoryBuilder() {
1✔
1550
                @Override
1551
                public ClientTransportFactory buildClientTransportFactory() {
1552
                  return transportFactory;
1✔
1553
                }
1554
              };
1555
          delegate = new ManagedChannelImplBuilder(
1✔
1556
              target,
1557
              channelCreds,
1558
              callCredentials,
1559
              transportFactoryBuilder,
1560
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1561
              .nameResolverRegistry(nameResolverRegistry);
1✔
1562
        }
1✔
1563

1564
        @Override
1565
        protected ManagedChannelBuilder<?> delegate() {
1566
          return delegate;
1✔
1567
        }
1568
      }
1569

1570
      checkState(!terminated, "Channel is terminated");
1✔
1571

1572
      @SuppressWarnings("deprecation")
1573
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1574

1575
      return builder
1✔
1576
          // TODO(zdapeng): executors should not outlive the parent channel.
1577
          .executor(executor)
1✔
1578
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1579
          .maxTraceEvents(maxTraceEvents)
1✔
1580
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1581
          .userAgent(userAgent);
1✔
1582
    }
1583

1584
    @Override
1585
    public ChannelCredentials getUnsafeChannelCredentials() {
1586
      if (originalChannelCreds == null) {
1✔
1587
        return new DefaultChannelCreds();
1✔
1588
      }
1589
      return originalChannelCreds;
×
1590
    }
1591

1592
    @Override
1593
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1594
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1595
    }
×
1596

1597
    @Override
1598
    public void updateOobChannelAddresses(ManagedChannel channel,
1599
        List<EquivalentAddressGroup> eag) {
1600
      checkArgument(channel instanceof OobChannel,
1✔
1601
          "channel must have been returned from createOobChannel");
1602
      ((OobChannel) channel).updateAddresses(eag);
1✔
1603
    }
1✔
1604

1605
    @Override
1606
    public String getAuthority() {
1607
      return ManagedChannelImpl.this.authority();
1✔
1608
    }
1609

1610
    @Override
1611
    public String getChannelTarget() {
1612
      return targetUri.toString();
1✔
1613
    }
1614

1615
    @Override
1616
    public SynchronizationContext getSynchronizationContext() {
1617
      return syncContext;
1✔
1618
    }
1619

1620
    @Override
1621
    public ScheduledExecutorService getScheduledExecutorService() {
1622
      return scheduledExecutor;
1✔
1623
    }
1624

1625
    @Override
1626
    public ChannelLogger getChannelLogger() {
1627
      return channelLogger;
1✔
1628
    }
1629

1630
    @Override
1631
    public NameResolver.Args getNameResolverArgs() {
1632
      return nameResolverArgs;
1✔
1633
    }
1634

1635
    @Override
1636
    public NameResolverRegistry getNameResolverRegistry() {
1637
      return nameResolverRegistry;
1✔
1638
    }
1639

1640
    @Override
1641
    public MetricRecorder getMetricRecorder() {
1642
      return metricRecorder;
1✔
1643
    }
1644

1645
    /**
1646
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1647
     */
1648
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1649
    //     channel creds.
1650
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1651
      @Override
1652
      public ChannelCredentials withoutBearerTokens() {
1653
        return this;
×
1654
      }
1655
    }
1656
  }
1657

1658
  final class NameResolverListener extends NameResolver.Listener2 {
1659
    final LbHelperImpl helper;
1660
    final NameResolver resolver;
1661

1662
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1663
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1664
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1665
    }
1✔
1666

1667
    @Override
1668
    public void onResult(final ResolutionResult resolutionResult) {
1669
      final class NamesResolved implements Runnable {
1✔
1670

1671
        @SuppressWarnings("ReferenceEquality")
1672
        @Override
1673
        public void run() {
1674
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1675
            return;
1✔
1676
          }
1677

1678
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1679
          channelLogger.log(
1✔
1680
              ChannelLogLevel.DEBUG,
1681
              "Resolved address: {0}, config={1}",
1682
              servers,
1683
              resolutionResult.getAttributes());
1✔
1684

1685
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1686
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1687
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1688
          }
1689

1690
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1691
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1692
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1693
          InternalConfigSelector resolvedConfigSelector =
1✔
1694
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1695
          ManagedChannelServiceConfig validServiceConfig =
1696
              configOrError != null && configOrError.getConfig() != null
1✔
1697
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1698
                  : null;
1✔
1699
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1700

1701
          ManagedChannelServiceConfig effectiveServiceConfig;
1702
          if (!lookUpServiceConfig) {
1✔
1703
            if (validServiceConfig != null) {
1✔
1704
              channelLogger.log(
1✔
1705
                  ChannelLogLevel.INFO,
1706
                  "Service config from name resolver discarded by channel settings");
1707
            }
1708
            effectiveServiceConfig =
1709
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1710
            if (resolvedConfigSelector != null) {
1✔
1711
              channelLogger.log(
1✔
1712
                  ChannelLogLevel.INFO,
1713
                  "Config selector from name resolver discarded by channel settings");
1714
            }
1715
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1716
          } else {
1717
            // Try to use config if returned from name resolver
1718
            // Otherwise, try to use the default config if available
1719
            if (validServiceConfig != null) {
1✔
1720
              effectiveServiceConfig = validServiceConfig;
1✔
1721
              if (resolvedConfigSelector != null) {
1✔
1722
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1723
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1724
                  channelLogger.log(
×
1725
                      ChannelLogLevel.DEBUG,
1726
                      "Method configs in service config will be discarded due to presence of"
1727
                          + "config-selector");
1728
                }
1729
              } else {
1730
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1731
              }
1732
            } else if (defaultServiceConfig != null) {
1✔
1733
              effectiveServiceConfig = defaultServiceConfig;
1✔
1734
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1735
              channelLogger.log(
1✔
1736
                  ChannelLogLevel.INFO,
1737
                  "Received no service config, using default service config");
1738
            } else if (serviceConfigError != null) {
1✔
1739
              if (!serviceConfigUpdated) {
1✔
1740
                // First DNS lookup has invalid service config, and cannot fall back to default
1741
                channelLogger.log(
1✔
1742
                    ChannelLogLevel.INFO,
1743
                    "Fallback to error due to invalid first service config without default config");
1744
                // This error could be an "inappropriate" control plane error that should not bleed
1745
                // through to client code using gRPC. We let them flow through here to the LB as
1746
                // we later check for these error codes when investigating pick results in
1747
                // GrpcUtil.getTransportFromPickResult().
1748
                onError(configOrError.getError());
1✔
1749
                if (resolutionResultListener != null) {
1✔
1750
                  resolutionResultListener.resolutionAttempted(configOrError.getError());
1✔
1751
                }
1752
                return;
1✔
1753
              } else {
1754
                effectiveServiceConfig = lastServiceConfig;
1✔
1755
              }
1756
            } else {
1757
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1758
              realChannel.updateConfigSelector(null);
1✔
1759
            }
1760
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1761
              channelLogger.log(
1✔
1762
                  ChannelLogLevel.INFO,
1763
                  "Service config changed{0}",
1764
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1765
              lastServiceConfig = effectiveServiceConfig;
1✔
1766
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1767
            }
1768

1769
            try {
1770
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1771
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1772
              //  lbNeedAddress is not deterministic
1773
              serviceConfigUpdated = true;
1✔
1774
            } catch (RuntimeException re) {
×
1775
              logger.log(
×
1776
                  Level.WARNING,
1777
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1778
                  re);
1779
            }
1✔
1780
          }
1781

1782
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1783
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1784
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1785
            Attributes.Builder attrBuilder =
1✔
1786
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1787
            Map<String, ?> healthCheckingConfig =
1✔
1788
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1789
            if (healthCheckingConfig != null) {
1✔
1790
              attrBuilder
1✔
1791
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1792
                  .build();
1✔
1793
            }
1794
            Attributes attributes = attrBuilder.build();
1✔
1795

1796
            Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1797
                ResolvedAddresses.newBuilder()
1✔
1798
                    .setAddresses(servers)
1✔
1799
                    .setAttributes(attributes)
1✔
1800
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1801
                    .build());
1✔
1802
            // If a listener is provided, let it know if the addresses were accepted.
1803
            if (resolutionResultListener != null) {
1✔
1804
              resolutionResultListener.resolutionAttempted(addressAcceptanceStatus);
1✔
1805
            }
1806
          }
1807
        }
1✔
1808
      }
1809

1810
      syncContext.execute(new NamesResolved());
1✔
1811
    }
1✔
1812

1813
    @Override
1814
    public void onError(final Status error) {
1815
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1816
      final class NameResolverErrorHandler implements Runnable {
1✔
1817
        @Override
1818
        public void run() {
1819
          handleErrorInSyncContext(error);
1✔
1820
        }
1✔
1821
      }
1822

1823
      syncContext.execute(new NameResolverErrorHandler());
1✔
1824
    }
1✔
1825

1826
    private void handleErrorInSyncContext(Status error) {
1827
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1828
          new Object[] {getLogId(), error});
1✔
1829
      realChannel.onConfigError();
1✔
1830
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1831
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1832
        lastResolutionState = ResolutionState.ERROR;
1✔
1833
      }
1834
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1835
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1836
        return;
1✔
1837
      }
1838

1839
      helper.lb.handleNameResolutionError(error);
1✔
1840
    }
1✔
1841
  }
1842

1843
  private final class SubchannelImpl extends AbstractSubchannel {
1844
    final CreateSubchannelArgs args;
1845
    final InternalLogId subchannelLogId;
1846
    final ChannelLoggerImpl subchannelLogger;
1847
    final ChannelTracer subchannelTracer;
1848
    List<EquivalentAddressGroup> addressGroups;
1849
    InternalSubchannel subchannel;
1850
    boolean started;
1851
    boolean shutdown;
1852
    ScheduledHandle delayedShutdownTask;
1853

1854
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1855
      checkNotNull(args, "args");
1✔
1856
      addressGroups = args.getAddresses();
1✔
1857
      if (authorityOverride != null) {
1✔
1858
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1859
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1860
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1861
      }
1862
      this.args = args;
1✔
1863
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1864
      subchannelTracer = new ChannelTracer(
1✔
1865
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1866
          "Subchannel for " + args.getAddresses());
1✔
1867
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1868
    }
1✔
1869

1870
    @Override
1871
    public void start(final SubchannelStateListener listener) {
1872
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1873
      checkState(!started, "already started");
1✔
1874
      checkState(!shutdown, "already shutdown");
1✔
1875
      checkState(!terminating, "Channel is being terminated");
1✔
1876
      started = true;
1✔
1877
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1878
        // All callbacks are run in syncContext
1879
        @Override
1880
        void onTerminated(InternalSubchannel is) {
1881
          subchannels.remove(is);
1✔
1882
          channelz.removeSubchannel(is);
1✔
1883
          maybeTerminateChannel();
1✔
1884
        }
1✔
1885

1886
        @Override
1887
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1888
          checkState(listener != null, "listener is null");
1✔
1889
          listener.onSubchannelState(newState);
1✔
1890
        }
1✔
1891

1892
        @Override
1893
        void onInUse(InternalSubchannel is) {
1894
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1895
        }
1✔
1896

1897
        @Override
1898
        void onNotInUse(InternalSubchannel is) {
1899
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1900
        }
1✔
1901
      }
1902

1903
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1904
          args.getAddresses(),
1✔
1905
          authority(),
1✔
1906
          userAgent,
1✔
1907
          backoffPolicyProvider,
1✔
1908
          transportFactory,
1✔
1909
          transportFactory.getScheduledExecutorService(),
1✔
1910
          stopwatchSupplier,
1✔
1911
          syncContext,
1912
          new ManagedInternalSubchannelCallback(),
1913
          channelz,
1✔
1914
          callTracerFactory.create(),
1✔
1915
          subchannelTracer,
1916
          subchannelLogId,
1917
          subchannelLogger,
1918
          transportFilters);
1✔
1919

1920
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1921
          .setDescription("Child Subchannel started")
1✔
1922
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1923
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1924
          .setSubchannelRef(internalSubchannel)
1✔
1925
          .build());
1✔
1926

1927
      this.subchannel = internalSubchannel;
1✔
1928
      channelz.addSubchannel(internalSubchannel);
1✔
1929
      subchannels.add(internalSubchannel);
1✔
1930
    }
1✔
1931

1932
    @Override
1933
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1934
      checkState(started, "not started");
1✔
1935
      return subchannel;
1✔
1936
    }
1937

1938
    @Override
1939
    public void shutdown() {
1940
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1941
      if (subchannel == null) {
1✔
1942
        // start() was not successful
1943
        shutdown = true;
×
1944
        return;
×
1945
      }
1946
      if (shutdown) {
1✔
1947
        if (terminating && delayedShutdownTask != null) {
1✔
1948
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1949
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1950
          delayedShutdownTask.cancel();
×
1951
          delayedShutdownTask = null;
×
1952
          // Will fall through to the subchannel.shutdown() at the end.
1953
        } else {
1954
          return;
1✔
1955
        }
1956
      } else {
1957
        shutdown = true;
1✔
1958
      }
1959
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1960
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1961
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1962
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1963
      // shutdown of Subchannel for a few seconds here.
1964
      //
1965
      // TODO(zhangkun83): consider a better approach
1966
      // (https://github.com/grpc/grpc-java/issues/2562).
1967
      if (!terminating) {
1✔
1968
        final class ShutdownSubchannel implements Runnable {
1✔
1969
          @Override
1970
          public void run() {
1971
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
1972
          }
1✔
1973
        }
1974

1975
        delayedShutdownTask = syncContext.schedule(
1✔
1976
            new LogExceptionRunnable(new ShutdownSubchannel()),
1977
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
1978
            transportFactory.getScheduledExecutorService());
1✔
1979
        return;
1✔
1980
      }
1981
      // When terminating == true, no more real streams will be created. It's safe and also
1982
      // desirable to shutdown timely.
1983
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
1984
    }
1✔
1985

1986
    @Override
1987
    public void requestConnection() {
1988
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1989
      checkState(started, "not started");
1✔
1990
      subchannel.obtainActiveTransport();
1✔
1991
    }
1✔
1992

1993
    @Override
1994
    public List<EquivalentAddressGroup> getAllAddresses() {
1995
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1996
      checkState(started, "not started");
1✔
1997
      return addressGroups;
1✔
1998
    }
1999

2000
    @Override
2001
    public Attributes getAttributes() {
2002
      return args.getAttributes();
1✔
2003
    }
2004

2005
    @Override
2006
    public String toString() {
2007
      return subchannelLogId.toString();
1✔
2008
    }
2009

2010
    @Override
2011
    public Channel asChannel() {
2012
      checkState(started, "not started");
1✔
2013
      return new SubchannelChannel(
1✔
2014
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2015
          transportFactory.getScheduledExecutorService(),
1✔
2016
          callTracerFactory.create(),
1✔
2017
          new AtomicReference<InternalConfigSelector>(null));
2018
    }
2019

2020
    @Override
2021
    public Object getInternalSubchannel() {
2022
      checkState(started, "Subchannel is not started");
1✔
2023
      return subchannel;
1✔
2024
    }
2025

2026
    @Override
2027
    public ChannelLogger getChannelLogger() {
2028
      return subchannelLogger;
1✔
2029
    }
2030

2031
    @Override
2032
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2033
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2034
      addressGroups = addrs;
1✔
2035
      if (authorityOverride != null) {
1✔
2036
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2037
      }
2038
      subchannel.updateAddresses(addrs);
1✔
2039
    }
1✔
2040

2041
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2042
        List<EquivalentAddressGroup> eags) {
2043
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2044
      for (EquivalentAddressGroup eag : eags) {
1✔
2045
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2046
            eag.getAddresses(),
1✔
2047
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2048
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2049
      }
1✔
2050
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2051
    }
2052
  }
2053

2054
  @Override
2055
  public String toString() {
2056
    return MoreObjects.toStringHelper(this)
1✔
2057
        .add("logId", logId.getId())
1✔
2058
        .add("target", target)
1✔
2059
        .toString();
1✔
2060
  }
2061

2062
  /**
2063
   * Called from syncContext.
2064
   */
2065
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2066
    @Override
2067
    public void transportShutdown(Status s) {
2068
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2069
    }
1✔
2070

2071
    @Override
2072
    public void transportReady() {
2073
      // Don't care
2074
    }
×
2075

2076
    @Override
2077
    public Attributes filterTransport(Attributes attributes) {
2078
      return attributes;
×
2079
    }
2080

2081
    @Override
2082
    public void transportInUse(final boolean inUse) {
2083
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2084
    }
1✔
2085

2086
    @Override
2087
    public void transportTerminated() {
2088
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2089
      terminating = true;
1✔
2090
      shutdownNameResolverAndLoadBalancer(false);
1✔
2091
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2092
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2093
      // here.
2094
      maybeShutdownNowSubchannels();
1✔
2095
      maybeTerminateChannel();
1✔
2096
    }
1✔
2097
  }
2098

2099
  /**
2100
   * Must be accessed from syncContext.
2101
   */
2102
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2103
    @Override
2104
    protected void handleInUse() {
2105
      exitIdleMode();
1✔
2106
    }
1✔
2107

2108
    @Override
2109
    protected void handleNotInUse() {
2110
      if (shutdown.get()) {
1✔
2111
        return;
1✔
2112
      }
2113
      rescheduleIdleTimer();
1✔
2114
    }
1✔
2115
  }
2116

2117
  /**
2118
   * Lazily request for Executor from an executor pool.
2119
   * Also act as an Executor directly to simply run a cmd
2120
   */
2121
  @VisibleForTesting
2122
  static final class ExecutorHolder implements Executor {
2123
    private final ObjectPool<? extends Executor> pool;
2124
    private Executor executor;
2125

2126
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2127
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2128
    }
1✔
2129

2130
    synchronized Executor getExecutor() {
2131
      if (executor == null) {
1✔
2132
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2133
      }
2134
      return executor;
1✔
2135
    }
2136

2137
    synchronized void release() {
2138
      if (executor != null) {
1✔
2139
        executor = pool.returnObject(executor);
1✔
2140
      }
2141
    }
1✔
2142

2143
    @Override
2144
    public void execute(Runnable command) {
2145
      getExecutor().execute(command);
1✔
2146
    }
1✔
2147
  }
2148

2149
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2150
    final ScheduledExecutorService delegate;
2151

2152
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2153
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2154
    }
1✔
2155

2156
    @Override
2157
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2158
      return delegate.schedule(callable, delay, unit);
×
2159
    }
2160

2161
    @Override
2162
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2163
      return delegate.schedule(cmd, delay, unit);
1✔
2164
    }
2165

2166
    @Override
2167
    public ScheduledFuture<?> scheduleAtFixedRate(
2168
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2169
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
1✔
2170
    }
2171

2172
    @Override
2173
    public ScheduledFuture<?> scheduleWithFixedDelay(
2174
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2175
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2176
    }
2177

2178
    @Override
2179
    public boolean awaitTermination(long timeout, TimeUnit unit)
2180
        throws InterruptedException {
2181
      return delegate.awaitTermination(timeout, unit);
×
2182
    }
2183

2184
    @Override
2185
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2186
        throws InterruptedException {
2187
      return delegate.invokeAll(tasks);
×
2188
    }
2189

2190
    @Override
2191
    public <T> List<Future<T>> invokeAll(
2192
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2193
        throws InterruptedException {
2194
      return delegate.invokeAll(tasks, timeout, unit);
×
2195
    }
2196

2197
    @Override
2198
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2199
        throws InterruptedException, ExecutionException {
2200
      return delegate.invokeAny(tasks);
×
2201
    }
2202

2203
    @Override
2204
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2205
        throws InterruptedException, ExecutionException, TimeoutException {
2206
      return delegate.invokeAny(tasks, timeout, unit);
×
2207
    }
2208

2209
    @Override
2210
    public boolean isShutdown() {
2211
      return delegate.isShutdown();
×
2212
    }
2213

2214
    @Override
2215
    public boolean isTerminated() {
2216
      return delegate.isTerminated();
×
2217
    }
2218

2219
    @Override
2220
    public void shutdown() {
2221
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2222
    }
2223

2224
    @Override
2225
    public List<Runnable> shutdownNow() {
2226
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2227
    }
2228

2229
    @Override
2230
    public <T> Future<T> submit(Callable<T> task) {
2231
      return delegate.submit(task);
×
2232
    }
2233

2234
    @Override
2235
    public Future<?> submit(Runnable task) {
2236
      return delegate.submit(task);
×
2237
    }
2238

2239
    @Override
2240
    public <T> Future<T> submit(Runnable task, T result) {
2241
      return delegate.submit(task, result);
×
2242
    }
2243

2244
    @Override
2245
    public void execute(Runnable command) {
2246
      delegate.execute(command);
×
2247
    }
×
2248
  }
2249

2250
  /**
2251
   * A ResolutionState indicates the status of last name resolution.
2252
   */
2253
  enum ResolutionState {
1✔
2254
    NO_RESOLUTION,
1✔
2255
    SUCCESS,
1✔
2256
    ERROR
1✔
2257
  }
2258
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc