• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19908

16 Jul 2025 07:54PM UTC coverage: 88.593% (+0.07%) from 88.528%
#19908

push

github

ejona86
Revert "xds: Convert CdsLb to XdsDepManager"

This reverts commit 297ab05ef.

b/430347751 shows multiple concerning behaviors in the xDS stack with
the new A74 config update model. XdsDepManager and CdsLB2 still seem to
be working correctly, but the change is exacerbated issues in other
parts of the stack, like RingHashConfig not having equals fixed in
a8de9f07ab.

Revert only for the v1.74.x release, leaving it on master.

34647 of 39108 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.43
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import com.google.errorprone.annotations.concurrent.GuardedBy;
36
import io.grpc.Attributes;
37
import io.grpc.CallCredentials;
38
import io.grpc.CallOptions;
39
import io.grpc.Channel;
40
import io.grpc.ChannelCredentials;
41
import io.grpc.ChannelLogger;
42
import io.grpc.ChannelLogger.ChannelLogLevel;
43
import io.grpc.ClientCall;
44
import io.grpc.ClientInterceptor;
45
import io.grpc.ClientInterceptors;
46
import io.grpc.ClientStreamTracer;
47
import io.grpc.ClientTransportFilter;
48
import io.grpc.CompressorRegistry;
49
import io.grpc.ConnectivityState;
50
import io.grpc.ConnectivityStateInfo;
51
import io.grpc.Context;
52
import io.grpc.Deadline;
53
import io.grpc.DecompressorRegistry;
54
import io.grpc.EquivalentAddressGroup;
55
import io.grpc.ForwardingChannelBuilder2;
56
import io.grpc.ForwardingClientCall;
57
import io.grpc.Grpc;
58
import io.grpc.InternalChannelz;
59
import io.grpc.InternalChannelz.ChannelStats;
60
import io.grpc.InternalChannelz.ChannelTrace;
61
import io.grpc.InternalConfigSelector;
62
import io.grpc.InternalInstrumented;
63
import io.grpc.InternalLogId;
64
import io.grpc.InternalWithLogId;
65
import io.grpc.LoadBalancer;
66
import io.grpc.LoadBalancer.CreateSubchannelArgs;
67
import io.grpc.LoadBalancer.PickResult;
68
import io.grpc.LoadBalancer.PickSubchannelArgs;
69
import io.grpc.LoadBalancer.ResolvedAddresses;
70
import io.grpc.LoadBalancer.SubchannelPicker;
71
import io.grpc.LoadBalancer.SubchannelStateListener;
72
import io.grpc.ManagedChannel;
73
import io.grpc.ManagedChannelBuilder;
74
import io.grpc.Metadata;
75
import io.grpc.MethodDescriptor;
76
import io.grpc.MetricInstrumentRegistry;
77
import io.grpc.MetricRecorder;
78
import io.grpc.NameResolver;
79
import io.grpc.NameResolver.ConfigOrError;
80
import io.grpc.NameResolver.ResolutionResult;
81
import io.grpc.NameResolverProvider;
82
import io.grpc.NameResolverRegistry;
83
import io.grpc.ProxyDetector;
84
import io.grpc.Status;
85
import io.grpc.StatusOr;
86
import io.grpc.SynchronizationContext;
87
import io.grpc.SynchronizationContext.ScheduledHandle;
88
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
89
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
90
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
91
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
92
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
93
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
94
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
95
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
96
import io.grpc.internal.RetriableStream.Throttle;
97
import java.net.URI;
98
import java.util.ArrayList;
99
import java.util.Collection;
100
import java.util.Collections;
101
import java.util.HashSet;
102
import java.util.LinkedHashSet;
103
import java.util.List;
104
import java.util.Map;
105
import java.util.Set;
106
import java.util.concurrent.Callable;
107
import java.util.concurrent.CountDownLatch;
108
import java.util.concurrent.ExecutionException;
109
import java.util.concurrent.Executor;
110
import java.util.concurrent.Future;
111
import java.util.concurrent.ScheduledExecutorService;
112
import java.util.concurrent.ScheduledFuture;
113
import java.util.concurrent.TimeUnit;
114
import java.util.concurrent.TimeoutException;
115
import java.util.concurrent.atomic.AtomicBoolean;
116
import java.util.concurrent.atomic.AtomicReference;
117
import java.util.logging.Level;
118
import java.util.logging.Logger;
119
import javax.annotation.Nullable;
120
import javax.annotation.concurrent.ThreadSafe;
121

122
/** A communication channel for making outgoing RPCs. */
123
@ThreadSafe
124
final class ManagedChannelImpl extends ManagedChannel implements
125
    InternalInstrumented<ChannelStats> {
126
  @VisibleForTesting
127
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
128

129
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
130

131
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
132

133
  @VisibleForTesting
134
  static final Status SHUTDOWN_NOW_STATUS =
1✔
135
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
136

137
  @VisibleForTesting
138
  static final Status SHUTDOWN_STATUS =
1✔
139
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
140

141
  @VisibleForTesting
142
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
143
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
144

145
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
146
      ManagedChannelServiceConfig.empty();
1✔
147
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
148
      new InternalConfigSelector() {
1✔
149
        @Override
150
        public Result selectConfig(PickSubchannelArgs args) {
151
          throw new IllegalStateException("Resolution is pending");
×
152
        }
153
      };
154
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
155
      new LoadBalancer.PickDetailsConsumer() {};
1✔
156

157
  private final InternalLogId logId;
158
  private final String target;
159
  @Nullable
160
  private final String authorityOverride;
161
  private final NameResolverRegistry nameResolverRegistry;
162
  private final URI targetUri;
163
  private final NameResolverProvider nameResolverProvider;
164
  private final NameResolver.Args nameResolverArgs;
165
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
166
  private final ClientTransportFactory originalTransportFactory;
167
  @Nullable
168
  private final ChannelCredentials originalChannelCreds;
169
  private final ClientTransportFactory transportFactory;
170
  private final ClientTransportFactory oobTransportFactory;
171
  private final RestrictedScheduledExecutor scheduledExecutor;
172
  private final Executor executor;
173
  private final ObjectPool<? extends Executor> executorPool;
174
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
175
  private final ExecutorHolder balancerRpcExecutorHolder;
176
  private final ExecutorHolder offloadExecutorHolder;
177
  private final TimeProvider timeProvider;
178
  private final int maxTraceEvents;
179

180
  @VisibleForTesting
1✔
181
  final SynchronizationContext syncContext = new SynchronizationContext(
182
      new Thread.UncaughtExceptionHandler() {
1✔
183
        @Override
184
        public void uncaughtException(Thread t, Throwable e) {
185
          logger.log(
1✔
186
              Level.SEVERE,
187
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
188
              e);
189
          try {
190
            panic(e);
1✔
191
          } catch (Throwable anotherT) {
×
192
            logger.log(
×
193
                Level.SEVERE, "[" + getLogId() + "] Uncaught exception while panicking", anotherT);
×
194
          }
1✔
195
        }
1✔
196
      });
197

198
  private boolean fullStreamDecompression;
199

200
  private final DecompressorRegistry decompressorRegistry;
201
  private final CompressorRegistry compressorRegistry;
202

203
  private final Supplier<Stopwatch> stopwatchSupplier;
204
  /** The timeout before entering idle mode. */
205
  private final long idleTimeoutMillis;
206

207
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
208
  private final BackoffPolicy.Provider backoffPolicyProvider;
209

210
  /**
211
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
212
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
213
   * {@link RealChannel}.
214
   */
215
  private final Channel interceptorChannel;
216

217
  private final List<ClientTransportFilter> transportFilters;
218
  @Nullable private final String userAgent;
219

220
  // Only null after channel is terminated. Must be assigned from the syncContext.
221
  private NameResolver nameResolver;
222

223
  // Must be accessed from the syncContext.
224
  private boolean nameResolverStarted;
225

226
  // null when channel is in idle mode.  Must be assigned from syncContext.
227
  @Nullable
228
  private LbHelperImpl lbHelper;
229

230
  // Must be accessed from the syncContext
231
  private boolean panicMode;
232

233
  // Must be mutated from syncContext
234
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
235
  // switch to a ConcurrentHashMap.
236
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
237

238
  // Must be accessed from syncContext
239
  @Nullable
240
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
241
  private final Object pendingCallsInUseObject = new Object();
1✔
242

243
  // Must be mutated from syncContext
244
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
245

246
  // reprocess() must be run from syncContext
247
  private final DelayedClientTransport delayedTransport;
248
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
249
      = new UncommittedRetriableStreamsRegistry();
250

251
  // Shutdown states.
252
  //
253
  // Channel's shutdown process:
254
  // 1. shutdown(): stop accepting new calls from applications
255
  //   1a shutdown <- true
256
  //   1b delayedTransport.shutdown()
257
  // 2. delayedTransport terminated: stop stream-creation functionality
258
  //   2a terminating <- true
259
  //   2b loadBalancer.shutdown()
260
  //     * LoadBalancer will shutdown subchannels and OOB channels
261
  //   2c loadBalancer <- null
262
  //   2d nameResolver.shutdown()
263
  //   2e nameResolver <- null
264
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
265

266
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
267
  // Must only be mutated and read from syncContext
268
  private boolean shutdownNowed;
269
  // Must only be mutated from syncContext
270
  private boolean terminating;
271
  // Must be mutated from syncContext
272
  private volatile boolean terminated;
273
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
274

275
  private final CallTracer.Factory callTracerFactory;
276
  private final CallTracer channelCallTracer;
277
  private final ChannelTracer channelTracer;
278
  private final ChannelLogger channelLogger;
279
  private final InternalChannelz channelz;
280
  private final RealChannel realChannel;
281
  // Must be mutated and read from syncContext
282
  // a flag for doing channel tracing when flipped
283
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
284
  // Must be mutated and read from constructor or syncContext
285
  // used for channel tracing when value changed
286
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
287

288
  @Nullable
289
  private final ManagedChannelServiceConfig defaultServiceConfig;
290
  // Must be mutated and read from constructor or syncContext
291
  private boolean serviceConfigUpdated = false;
1✔
292
  private final boolean lookUpServiceConfig;
293

294
  // One instance per channel.
295
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
296

297
  private final long perRpcBufferLimit;
298
  private final long channelBufferLimit;
299

300
  // Temporary false flag that can skip the retry code path.
301
  private final boolean retryEnabled;
302

303
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
304

305
  // Called from syncContext
306
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
307
      new DelayedTransportListener();
308

309
  // Must be called from syncContext
310
  private void maybeShutdownNowSubchannels() {
311
    if (shutdownNowed) {
1✔
312
      for (InternalSubchannel subchannel : subchannels) {
1✔
313
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
314
      }
1✔
315
      for (OobChannel oobChannel : oobChannels) {
1✔
316
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
317
      }
1✔
318
    }
319
  }
1✔
320

321
  // Must be accessed from syncContext
322
  @VisibleForTesting
1✔
323
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
324

325
  @Override
326
  public ListenableFuture<ChannelStats> getStats() {
327
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
328
    final class StatsFetcher implements Runnable {
1✔
329
      @Override
330
      public void run() {
331
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
332
        channelCallTracer.updateBuilder(builder);
1✔
333
        channelTracer.updateBuilder(builder);
1✔
334
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
335
        List<InternalWithLogId> children = new ArrayList<>();
1✔
336
        children.addAll(subchannels);
1✔
337
        children.addAll(oobChannels);
1✔
338
        builder.setSubchannels(children);
1✔
339
        ret.set(builder.build());
1✔
340
      }
1✔
341
    }
342

343
    // subchannels and oobchannels can only be accessed from syncContext
344
    syncContext.execute(new StatsFetcher());
1✔
345
    return ret;
1✔
346
  }
347

348
  @Override
349
  public InternalLogId getLogId() {
350
    return logId;
1✔
351
  }
352

353
  // Run from syncContext
354
  private class IdleModeTimer implements Runnable {
1✔
355

356
    @Override
357
    public void run() {
358
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
359
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
360
      // subtle to change rapidly to resolve the channel panic. See #8714
361
      if (lbHelper == null) {
1✔
362
        return;
×
363
      }
364
      enterIdleMode();
1✔
365
    }
1✔
366
  }
367

368
  // Must be called from syncContext
369
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
370
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
371
    if (channelIsActive) {
1✔
372
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
373
      checkState(lbHelper != null, "lbHelper is null");
1✔
374
    }
375
    if (nameResolver != null) {
1✔
376
      nameResolver.shutdown();
1✔
377
      nameResolverStarted = false;
1✔
378
      if (channelIsActive) {
1✔
379
        nameResolver = getNameResolver(
1✔
380
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
381
      } else {
382
        nameResolver = null;
1✔
383
      }
384
    }
385
    if (lbHelper != null) {
1✔
386
      lbHelper.lb.shutdown();
1✔
387
      lbHelper = null;
1✔
388
    }
389
  }
1✔
390

391
  /**
392
   * Make the channel exit idle mode, if it's in it.
393
   *
394
   * <p>Must be called from syncContext
395
   */
396
  @VisibleForTesting
397
  void exitIdleMode() {
398
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
399
    if (shutdown.get() || panicMode) {
1✔
400
      return;
1✔
401
    }
402
    if (inUseStateAggregator.isInUse()) {
1✔
403
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
404
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
405
      cancelIdleTimer(false);
1✔
406
    } else {
407
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
408
      // isInUse() == false, in which case we still need to schedule the timer.
409
      rescheduleIdleTimer();
1✔
410
    }
411
    if (lbHelper != null) {
1✔
412
      return;
1✔
413
    }
414
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
415
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
416
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
417
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
418
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
419
    this.lbHelper = lbHelper;
1✔
420

421
    channelStateManager.gotoState(CONNECTING);
1✔
422
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
423
    nameResolver.start(listener);
1✔
424
    nameResolverStarted = true;
1✔
425
  }
1✔
426

427
  // Must be run from syncContext
428
  private void enterIdleMode() {
429
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
430
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
431
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
432
    // which are bugs.
433
    shutdownNameResolverAndLoadBalancer(true);
1✔
434
    delayedTransport.reprocess(null);
1✔
435
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
436
    channelStateManager.gotoState(IDLE);
1✔
437
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
438
    // transport to be holding some we need to exit idle mode to give these calls a chance to
439
    // be processed.
440
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
441
      exitIdleMode();
1✔
442
    }
443
  }
1✔
444

445
  // Must be run from syncContext
446
  private void cancelIdleTimer(boolean permanent) {
447
    idleTimer.cancel(permanent);
1✔
448
  }
1✔
449

450
  // Always run from syncContext
451
  private void rescheduleIdleTimer() {
452
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
453
      return;
1✔
454
    }
455
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
456
  }
1✔
457

458
  /**
459
   * Force name resolution refresh to happen immediately. Must be run
460
   * from syncContext.
461
   */
462
  private void refreshNameResolution() {
463
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
464
    if (nameResolverStarted) {
1✔
465
      nameResolver.refresh();
1✔
466
    }
467
  }
1✔
468

469
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
470
    volatile Throttle throttle;
471

472
    @Override
473
    public ClientStream newStream(
474
        final MethodDescriptor<?, ?> method,
475
        final CallOptions callOptions,
476
        final Metadata headers,
477
        final Context context) {
478
      // There is no need to reschedule the idle timer here. If the channel isn't shut down, either
479
      // the delayed transport or a real transport will go in-use and cancel the idle timer.
480
      if (!retryEnabled) {
1✔
481
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
482
            callOptions, headers, 0, /* isTransparentRetry= */ false);
483
        Context origContext = context.attach();
1✔
484
        try {
485
          return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
486
        } finally {
487
          context.detach(origContext);
1✔
488
        }
489
      } else {
490
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
491
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
492
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
493
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
494
          @SuppressWarnings("unchecked")
495
          RetryStream() {
1✔
496
            super(
1✔
497
                (MethodDescriptor<ReqT, ?>) method,
498
                headers,
499
                channelBufferUsed,
1✔
500
                perRpcBufferLimit,
1✔
501
                channelBufferLimit,
1✔
502
                getCallExecutor(callOptions),
1✔
503
                transportFactory.getScheduledExecutorService(),
1✔
504
                retryPolicy,
505
                hedgingPolicy,
506
                throttle);
507
          }
1✔
508

509
          @Override
510
          Status prestart() {
511
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
512
          }
513

514
          @Override
515
          void postCommit() {
516
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
517
          }
1✔
518

519
          @Override
520
          ClientStream newSubstream(
521
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
522
              boolean isTransparentRetry) {
523
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
524
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
525
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
526
            Context origContext = context.attach();
1✔
527
            try {
528
              return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
1✔
529
            } finally {
530
              context.detach(origContext);
1✔
531
            }
532
          }
533
        }
534

535
        return new RetryStream<>();
1✔
536
      }
537
    }
538
  }
539

540
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
541

542
  private final Rescheduler idleTimer;
543
  private final MetricRecorder metricRecorder;
544

545
  ManagedChannelImpl(
546
      ManagedChannelImplBuilder builder,
547
      ClientTransportFactory clientTransportFactory,
548
      URI targetUri,
549
      NameResolverProvider nameResolverProvider,
550
      BackoffPolicy.Provider backoffPolicyProvider,
551
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
552
      Supplier<Stopwatch> stopwatchSupplier,
553
      List<ClientInterceptor> interceptors,
554
      final TimeProvider timeProvider) {
1✔
555
    this.target = checkNotNull(builder.target, "target");
1✔
556
    this.logId = InternalLogId.allocate("Channel", target);
1✔
557
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
558
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
559
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
560
    this.originalChannelCreds = builder.channelCredentials;
1✔
561
    this.originalTransportFactory = clientTransportFactory;
1✔
562
    this.offloadExecutorHolder =
1✔
563
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
564
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
565
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
566
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
567
        clientTransportFactory, null, this.offloadExecutorHolder);
568
    this.scheduledExecutor =
1✔
569
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
570
    maxTraceEvents = builder.maxTraceEvents;
1✔
571
    channelTracer = new ChannelTracer(
1✔
572
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
573
        "Channel for '" + target + "'");
574
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
575
    ProxyDetector proxyDetector =
576
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
577
    this.retryEnabled = builder.retryEnabled;
1✔
578
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
579
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
580
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
581
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
582
    ScParser serviceConfigParser =
1✔
583
        new ScParser(
584
            retryEnabled,
585
            builder.maxRetryAttempts,
586
            builder.maxHedgedAttempts,
587
            loadBalancerFactory);
588
    this.authorityOverride = builder.authorityOverride;
1✔
589
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
590
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
591
    NameResolver.Args.Builder nameResolverArgsBuilder = NameResolver.Args.newBuilder()
1✔
592
            .setDefaultPort(builder.getDefaultPort())
1✔
593
            .setProxyDetector(proxyDetector)
1✔
594
            .setSynchronizationContext(syncContext)
1✔
595
            .setScheduledExecutorService(scheduledExecutor)
1✔
596
            .setServiceConfigParser(serviceConfigParser)
1✔
597
            .setChannelLogger(channelLogger)
1✔
598
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
599
            .setOverrideAuthority(this.authorityOverride)
1✔
600
            .setMetricRecorder(this.metricRecorder);
1✔
601
    builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
1✔
602
    this.nameResolverArgs = nameResolverArgsBuilder.build();
1✔
603
    this.nameResolver = getNameResolver(
1✔
604
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
605
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
606
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
607
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
608
    this.delayedTransport.start(delayedTransportListener);
1✔
609
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
610

611
    if (builder.defaultServiceConfig != null) {
1✔
612
      ConfigOrError parsedDefaultServiceConfig =
1✔
613
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
614
      checkState(
1✔
615
          parsedDefaultServiceConfig.getError() == null,
1✔
616
          "Default config is invalid: %s",
617
          parsedDefaultServiceConfig.getError());
1✔
618
      this.defaultServiceConfig =
1✔
619
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
620
      this.transportProvider.throttle = this.defaultServiceConfig.getRetryThrottling();
1✔
621
    } else {
1✔
622
      this.defaultServiceConfig = null;
1✔
623
    }
624
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
625
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
626
    Channel channel = realChannel;
1✔
627
    if (builder.binlog != null) {
1✔
628
      channel = builder.binlog.wrapChannel(channel);
1✔
629
    }
630
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
631
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
632
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
633
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
634
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
635
    } else {
636
      checkArgument(
1✔
637
          builder.idleTimeoutMillis
638
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
639
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
640
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
641
    }
642

643
    idleTimer = new Rescheduler(
1✔
644
        new IdleModeTimer(),
645
        syncContext,
646
        transportFactory.getScheduledExecutorService(),
1✔
647
        stopwatchSupplier.get());
1✔
648
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
649
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
650
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
651
    this.userAgent = builder.userAgent;
1✔
652

653
    this.channelBufferLimit = builder.retryBufferSize;
1✔
654
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
655
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
656
      @Override
657
      public CallTracer create() {
658
        return new CallTracer(timeProvider);
1✔
659
      }
660
    }
661

662
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
663
    channelCallTracer = callTracerFactory.create();
1✔
664
    this.channelz = checkNotNull(builder.channelz);
1✔
665
    channelz.addRootChannel(this);
1✔
666

667
    if (!lookUpServiceConfig) {
1✔
668
      if (defaultServiceConfig != null) {
1✔
669
        channelLogger.log(
1✔
670
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
671
      }
672
      serviceConfigUpdated = true;
1✔
673
    }
674
  }
1✔
675

676
  @VisibleForTesting
677
  static NameResolver getNameResolver(
678
      URI targetUri, @Nullable final String overrideAuthority,
679
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
680
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
681
    if (resolver == null) {
1✔
682
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
683
    }
684

685
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
686
    // TODO: After a transition period, all NameResolver implementations that need retry should use
687
    //       RetryingNameResolver directly and this step can be removed.
688
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
689
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
690
              nameResolverArgs.getScheduledExecutorService(),
1✔
691
              nameResolverArgs.getSynchronizationContext()),
1✔
692
          nameResolverArgs.getSynchronizationContext());
1✔
693

694
    if (overrideAuthority == null) {
1✔
695
      return usedNameResolver;
1✔
696
    }
697

698
    return new ForwardingNameResolver(usedNameResolver) {
1✔
699
      @Override
700
      public String getServiceAuthority() {
701
        return overrideAuthority;
1✔
702
      }
703
    };
704
  }
705

706
  @VisibleForTesting
707
  InternalConfigSelector getConfigSelector() {
708
    return realChannel.configSelector.get();
1✔
709
  }
710
  
711
  @VisibleForTesting
712
  boolean hasThrottle() {
713
    return this.transportProvider.throttle != null;
1✔
714
  }
715

716
  /**
717
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
718
   * cancelled.
719
   */
720
  @Override
721
  public ManagedChannelImpl shutdown() {
722
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
723
    if (!shutdown.compareAndSet(false, true)) {
1✔
724
      return this;
1✔
725
    }
726
    final class Shutdown implements Runnable {
1✔
727
      @Override
728
      public void run() {
729
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
730
        channelStateManager.gotoState(SHUTDOWN);
1✔
731
      }
1✔
732
    }
733

734
    syncContext.execute(new Shutdown());
1✔
735
    realChannel.shutdown();
1✔
736
    final class CancelIdleTimer implements Runnable {
1✔
737
      @Override
738
      public void run() {
739
        cancelIdleTimer(/* permanent= */ true);
1✔
740
      }
1✔
741
    }
742

743
    syncContext.execute(new CancelIdleTimer());
1✔
744
    return this;
1✔
745
  }
746

747
  /**
748
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
749
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
750
   * return {@code false} immediately after this method returns.
751
   */
752
  @Override
753
  public ManagedChannelImpl shutdownNow() {
754
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
755
    shutdown();
1✔
756
    realChannel.shutdownNow();
1✔
757
    final class ShutdownNow implements Runnable {
1✔
758
      @Override
759
      public void run() {
760
        if (shutdownNowed) {
1✔
761
          return;
1✔
762
        }
763
        shutdownNowed = true;
1✔
764
        maybeShutdownNowSubchannels();
1✔
765
      }
1✔
766
    }
767

768
    syncContext.execute(new ShutdownNow());
1✔
769
    return this;
1✔
770
  }
771

772
  // Called from syncContext
773
  @VisibleForTesting
774
  void panic(final Throwable t) {
775
    if (panicMode) {
1✔
776
      // Preserve the first panic information
777
      return;
×
778
    }
779
    panicMode = true;
1✔
780
    try {
781
      cancelIdleTimer(/* permanent= */ true);
1✔
782
      shutdownNameResolverAndLoadBalancer(false);
1✔
783
    } finally {
784
      updateSubchannelPicker(new LoadBalancer.FixedResultPicker(PickResult.withDrop(
1✔
785
          Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t))));
1✔
786
      realChannel.updateConfigSelector(null);
1✔
787
      channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
788
      channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
789
    }
790
  }
1✔
791

792
  @VisibleForTesting
793
  boolean isInPanicMode() {
794
    return panicMode;
1✔
795
  }
796

797
  // Called from syncContext
798
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
799
    delayedTransport.reprocess(newPicker);
1✔
800
  }
1✔
801

802
  @Override
803
  public boolean isShutdown() {
804
    return shutdown.get();
1✔
805
  }
806

807
  @Override
808
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
809
    return terminatedLatch.await(timeout, unit);
1✔
810
  }
811

812
  @Override
813
  public boolean isTerminated() {
814
    return terminated;
1✔
815
  }
816

817
  /*
818
   * Creates a new outgoing call on the channel.
819
   */
820
  @Override
821
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
822
      CallOptions callOptions) {
823
    return interceptorChannel.newCall(method, callOptions);
1✔
824
  }
825

826
  @Override
827
  public String authority() {
828
    return interceptorChannel.authority();
1✔
829
  }
830

831
  private Executor getCallExecutor(CallOptions callOptions) {
832
    Executor executor = callOptions.getExecutor();
1✔
833
    if (executor == null) {
1✔
834
      executor = this.executor;
1✔
835
    }
836
    return executor;
1✔
837
  }
838

839
  private class RealChannel extends Channel {
840
    // Reference to null if no config selector is available from resolution result
841
    // Reference must be set() from syncContext
842
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
843
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
844
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
845
    // same target, the new instance must have the same value.
846
    private final String authority;
847

848
    private final Channel clientCallImplChannel = new Channel() {
1✔
849
      @Override
850
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
851
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
852
        return new ClientCallImpl<>(
1✔
853
            method,
854
            getCallExecutor(callOptions),
1✔
855
            callOptions,
856
            transportProvider,
1✔
857
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
858
            channelCallTracer,
1✔
859
            null)
860
            .setFullStreamDecompression(fullStreamDecompression)
1✔
861
            .setDecompressorRegistry(decompressorRegistry)
1✔
862
            .setCompressorRegistry(compressorRegistry);
1✔
863
      }
864

865
      @Override
866
      public String authority() {
867
        return authority;
×
868
      }
869
    };
870

871
    private RealChannel(String authority) {
1✔
872
      this.authority =  checkNotNull(authority, "authority");
1✔
873
    }
1✔
874

875
    @Override
876
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
877
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
878
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
879
        return newClientCall(method, callOptions);
1✔
880
      }
881
      syncContext.execute(new Runnable() {
1✔
882
        @Override
883
        public void run() {
884
          exitIdleMode();
1✔
885
        }
1✔
886
      });
887
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
888
        // This is an optimization for the case (typically with InProcessTransport) when name
889
        // resolution result is immediately available at this point. Otherwise, some users'
890
        // tests might observe slight behavior difference from earlier grpc versions.
891
        return newClientCall(method, callOptions);
1✔
892
      }
893
      if (shutdown.get()) {
1✔
894
        // Return a failing ClientCall.
895
        return new ClientCall<ReqT, RespT>() {
×
896
          @Override
897
          public void start(Listener<RespT> responseListener, Metadata headers) {
898
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
899
          }
×
900

901
          @Override public void request(int numMessages) {}
×
902

903
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
904

905
          @Override public void halfClose() {}
×
906

907
          @Override public void sendMessage(ReqT message) {}
×
908
        };
909
      }
910
      Context context = Context.current();
1✔
911
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
912
      syncContext.execute(new Runnable() {
1✔
913
        @Override
914
        public void run() {
915
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
916
            if (pendingCalls == null) {
1✔
917
              pendingCalls = new LinkedHashSet<>();
1✔
918
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
919
            }
920
            pendingCalls.add(pendingCall);
1✔
921
          } else {
922
            pendingCall.reprocess();
1✔
923
          }
924
        }
1✔
925
      });
926
      return pendingCall;
1✔
927
    }
928

929
    // Must run in SynchronizationContext.
930
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
931
      InternalConfigSelector prevConfig = configSelector.get();
1✔
932
      configSelector.set(config);
1✔
933
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
934
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
935
          pendingCall.reprocess();
1✔
936
        }
1✔
937
      }
938
    }
1✔
939

940
    // Must run in SynchronizationContext.
941
    void onConfigError() {
942
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
943
        // Apply Default Service Config if initial name resolution fails.
944
        if (defaultServiceConfig != null) {
1✔
945
          updateConfigSelector(defaultServiceConfig.getDefaultConfigSelector());
1✔
946
          lastServiceConfig = defaultServiceConfig;
1✔
947
          channelLogger.log(ChannelLogLevel.ERROR,
1✔
948
              "Initial Name Resolution error, using default service config");
949
        } else {
950
          updateConfigSelector(null);
1✔
951
        }
952
      }
953
    }
1✔
954

955
    void shutdown() {
956
      final class RealChannelShutdown implements Runnable {
1✔
957
        @Override
958
        public void run() {
959
          if (pendingCalls == null) {
1✔
960
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
961
              configSelector.set(null);
1✔
962
            }
963
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
964
          }
965
        }
1✔
966
      }
967

968
      syncContext.execute(new RealChannelShutdown());
1✔
969
    }
1✔
970

971
    void shutdownNow() {
972
      final class RealChannelShutdownNow implements Runnable {
1✔
973
        @Override
974
        public void run() {
975
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
976
            configSelector.set(null);
1✔
977
          }
978
          if (pendingCalls != null) {
1✔
979
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
980
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
981
            }
1✔
982
          }
983
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
984
        }
1✔
985
      }
986

987
      syncContext.execute(new RealChannelShutdownNow());
1✔
988
    }
1✔
989

990
    @Override
991
    public String authority() {
992
      return authority;
1✔
993
    }
994

995
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
996
      final Context context;
997
      final MethodDescriptor<ReqT, RespT> method;
998
      final CallOptions callOptions;
999
      private final long callCreationTime;
1000

1001
      PendingCall(
1002
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1003
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1004
        this.context = context;
1✔
1005
        this.method = method;
1✔
1006
        this.callOptions = callOptions;
1✔
1007
        this.callCreationTime = ticker.nanoTime();
1✔
1008
      }
1✔
1009

1010
      /** Called when it's ready to create a real call and reprocess the pending call. */
1011
      void reprocess() {
1012
        ClientCall<ReqT, RespT> realCall;
1013
        Context previous = context.attach();
1✔
1014
        try {
1015
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1016
              ticker.nanoTime() - callCreationTime);
1✔
1017
          realCall = newClientCall(method, delayResolutionOption);
1✔
1018
        } finally {
1019
          context.detach(previous);
1✔
1020
        }
1021
        Runnable toRun = setCall(realCall);
1✔
1022
        if (toRun == null) {
1✔
1023
          syncContext.execute(new PendingCallRemoval());
1✔
1024
        } else {
1025
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1026
            @Override
1027
            public void run() {
1028
              toRun.run();
1✔
1029
              syncContext.execute(new PendingCallRemoval());
1✔
1030
            }
1✔
1031
          });
1032
        }
1033
      }
1✔
1034

1035
      @Override
1036
      protected void callCancelled() {
1037
        super.callCancelled();
1✔
1038
        syncContext.execute(new PendingCallRemoval());
1✔
1039
      }
1✔
1040

1041
      final class PendingCallRemoval implements Runnable {
1✔
1042
        @Override
1043
        public void run() {
1044
          if (pendingCalls != null) {
1✔
1045
            pendingCalls.remove(PendingCall.this);
1✔
1046
            if (pendingCalls.isEmpty()) {
1✔
1047
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1048
              pendingCalls = null;
1✔
1049
              if (shutdown.get()) {
1✔
1050
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1051
              }
1052
            }
1053
          }
1054
        }
1✔
1055
      }
1056
    }
1057

1058
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1059
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1060
      InternalConfigSelector selector = configSelector.get();
1✔
1061
      if (selector == null) {
1✔
1062
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1063
      }
1064
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1065
        MethodInfo methodInfo =
1✔
1066
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1067
        if (methodInfo != null) {
1✔
1068
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1069
        }
1070
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1071
      }
1072
      return new ConfigSelectingClientCall<>(
1✔
1073
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1074
    }
1075
  }
1076

1077
  /**
1078
   * A client call for a given channel that applies a given config selector when it starts.
1079
   */
1080
  static final class ConfigSelectingClientCall<ReqT, RespT>
1081
      extends ForwardingClientCall<ReqT, RespT> {
1082

1083
    private final InternalConfigSelector configSelector;
1084
    private final Channel channel;
1085
    private final Executor callExecutor;
1086
    private final MethodDescriptor<ReqT, RespT> method;
1087
    private final Context context;
1088
    private CallOptions callOptions;
1089

1090
    private ClientCall<ReqT, RespT> delegate;
1091

1092
    ConfigSelectingClientCall(
1093
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1094
        MethodDescriptor<ReqT, RespT> method,
1095
        CallOptions callOptions) {
1✔
1096
      this.configSelector = configSelector;
1✔
1097
      this.channel = channel;
1✔
1098
      this.method = method;
1✔
1099
      this.callExecutor =
1✔
1100
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1101
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1102
      this.context = Context.current();
1✔
1103
    }
1✔
1104

1105
    @Override
1106
    protected ClientCall<ReqT, RespT> delegate() {
1107
      return delegate;
1✔
1108
    }
1109

1110
    @SuppressWarnings("unchecked")
1111
    @Override
1112
    public void start(Listener<RespT> observer, Metadata headers) {
1113
      PickSubchannelArgs args =
1✔
1114
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1115
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1116
      Status status = result.getStatus();
1✔
1117
      if (!status.isOk()) {
1✔
1118
        executeCloseObserverInContext(observer,
1✔
1119
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1120
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1121
        return;
1✔
1122
      }
1123
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1124
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1125
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1126
      if (methodInfo != null) {
1✔
1127
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1128
      }
1129
      if (interceptor != null) {
1✔
1130
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1131
      } else {
1132
        delegate = channel.newCall(method, callOptions);
×
1133
      }
1134
      delegate.start(observer, headers);
1✔
1135
    }
1✔
1136

1137
    private void executeCloseObserverInContext(
1138
        final Listener<RespT> observer, final Status status) {
1139
      class CloseInContext extends ContextRunnable {
1140
        CloseInContext() {
1✔
1141
          super(context);
1✔
1142
        }
1✔
1143

1144
        @Override
1145
        public void runInContext() {
1146
          observer.onClose(status, new Metadata());
1✔
1147
        }
1✔
1148
      }
1149

1150
      callExecutor.execute(new CloseInContext());
1✔
1151
    }
1✔
1152

1153
    @Override
1154
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1155
      if (delegate != null) {
×
1156
        delegate.cancel(message, cause);
×
1157
      }
1158
    }
×
1159
  }
1160

1161
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1162
    @Override
1163
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1164

1165
    @Override
1166
    public void request(int numMessages) {}
1✔
1167

1168
    @Override
1169
    public void cancel(String message, Throwable cause) {}
×
1170

1171
    @Override
1172
    public void halfClose() {}
×
1173

1174
    @Override
1175
    public void sendMessage(Object message) {}
×
1176

1177
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1178
    @Override
1179
    public boolean isReady() {
1180
      return false;
×
1181
    }
1182
  };
1183

1184
  /**
1185
   * Terminate the channel if termination conditions are met.
1186
   */
1187
  // Must be run from syncContext
1188
  private void maybeTerminateChannel() {
1189
    if (terminated) {
1✔
1190
      return;
×
1191
    }
1192
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1193
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1194
      channelz.removeRootChannel(this);
1✔
1195
      executorPool.returnObject(executor);
1✔
1196
      balancerRpcExecutorHolder.release();
1✔
1197
      offloadExecutorHolder.release();
1✔
1198
      // Release the transport factory so that it can deallocate any resources.
1199
      transportFactory.close();
1✔
1200

1201
      terminated = true;
1✔
1202
      terminatedLatch.countDown();
1✔
1203
    }
1204
  }
1✔
1205

1206
  // Must be called from syncContext
1207
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1208
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1209
      refreshNameResolution();
1✔
1210
    }
1211
  }
1✔
1212

1213
  @Override
1214
  public ConnectivityState getState(boolean requestConnection) {
1215
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1216
    if (requestConnection && savedChannelState == IDLE) {
1✔
1217
      final class RequestConnection implements Runnable {
1✔
1218
        @Override
1219
        public void run() {
1220
          exitIdleMode();
1✔
1221
          if (lbHelper != null) {
1✔
1222
            lbHelper.lb.requestConnection();
1✔
1223
          }
1224
        }
1✔
1225
      }
1226

1227
      syncContext.execute(new RequestConnection());
1✔
1228
    }
1229
    return savedChannelState;
1✔
1230
  }
1231

1232
  @Override
1233
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1234
    final class NotifyStateChanged implements Runnable {
1✔
1235
      @Override
1236
      public void run() {
1237
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1238
      }
1✔
1239
    }
1240

1241
    syncContext.execute(new NotifyStateChanged());
1✔
1242
  }
1✔
1243

1244
  @Override
1245
  public void resetConnectBackoff() {
1246
    final class ResetConnectBackoff implements Runnable {
1✔
1247
      @Override
1248
      public void run() {
1249
        if (shutdown.get()) {
1✔
1250
          return;
1✔
1251
        }
1252
        if (nameResolverStarted) {
1✔
1253
          refreshNameResolution();
1✔
1254
        }
1255
        for (InternalSubchannel subchannel : subchannels) {
1✔
1256
          subchannel.resetConnectBackoff();
1✔
1257
        }
1✔
1258
        for (OobChannel oobChannel : oobChannels) {
1✔
1259
          oobChannel.resetConnectBackoff();
×
1260
        }
×
1261
      }
1✔
1262
    }
1263

1264
    syncContext.execute(new ResetConnectBackoff());
1✔
1265
  }
1✔
1266

1267
  @Override
1268
  public void enterIdle() {
1269
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1270
      @Override
1271
      public void run() {
1272
        if (shutdown.get() || lbHelper == null) {
1✔
1273
          return;
1✔
1274
        }
1275
        cancelIdleTimer(/* permanent= */ false);
1✔
1276
        enterIdleMode();
1✔
1277
      }
1✔
1278
    }
1279

1280
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1281
  }
1✔
1282

1283
  /**
1284
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1285
   * backoff.
1286
   */
1287
  private final class UncommittedRetriableStreamsRegistry {
1✔
1288
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1289
    // it's worthwhile to look for a lock-free approach.
1290
    final Object lock = new Object();
1✔
1291

1292
    @GuardedBy("lock")
1✔
1293
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1294

1295
    @GuardedBy("lock")
1296
    Status shutdownStatus;
1297

1298
    void onShutdown(Status reason) {
1299
      boolean shouldShutdownDelayedTransport = false;
1✔
1300
      synchronized (lock) {
1✔
1301
        if (shutdownStatus != null) {
1✔
1302
          return;
1✔
1303
        }
1304
        shutdownStatus = reason;
1✔
1305
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1306
        // retriable streams, which may be in backoff and not using any transport, are already
1307
        // started RPCs.
1308
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1309
          shouldShutdownDelayedTransport = true;
1✔
1310
        }
1311
      }
1✔
1312

1313
      if (shouldShutdownDelayedTransport) {
1✔
1314
        delayedTransport.shutdown(reason);
1✔
1315
      }
1316
    }
1✔
1317

1318
    void onShutdownNow(Status reason) {
1319
      onShutdown(reason);
1✔
1320
      Collection<ClientStream> streams;
1321

1322
      synchronized (lock) {
1✔
1323
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1324
      }
1✔
1325

1326
      for (ClientStream stream : streams) {
1✔
1327
        stream.cancel(reason);
1✔
1328
      }
1✔
1329
      delayedTransport.shutdownNow(reason);
1✔
1330
    }
1✔
1331

1332
    /**
1333
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1334
     * shutdown Status.
1335
     */
1336
    @Nullable
1337
    Status add(RetriableStream<?> retriableStream) {
1338
      synchronized (lock) {
1✔
1339
        if (shutdownStatus != null) {
1✔
1340
          return shutdownStatus;
1✔
1341
        }
1342
        uncommittedRetriableStreams.add(retriableStream);
1✔
1343
        return null;
1✔
1344
      }
1345
    }
1346

1347
    void remove(RetriableStream<?> retriableStream) {
1348
      Status shutdownStatusCopy = null;
1✔
1349

1350
      synchronized (lock) {
1✔
1351
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1352
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1353
          shutdownStatusCopy = shutdownStatus;
1✔
1354
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1355
          // hashmap.
1356
          uncommittedRetriableStreams = new HashSet<>();
1✔
1357
        }
1358
      }
1✔
1359

1360
      if (shutdownStatusCopy != null) {
1✔
1361
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1362
      }
1363
    }
1✔
1364
  }
1365

1366
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1367
    AutoConfiguredLoadBalancer lb;
1368

1369
    @Override
1370
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1371
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1372
      // No new subchannel should be created after load balancer has been shutdown.
1373
      checkState(!terminating, "Channel is being terminated");
1✔
1374
      return new SubchannelImpl(args);
1✔
1375
    }
1376

1377
    @Override
1378
    public void updateBalancingState(
1379
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1380
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1381
      checkNotNull(newState, "newState");
1✔
1382
      checkNotNull(newPicker, "newPicker");
1✔
1383

1384
      if (LbHelperImpl.this != lbHelper || panicMode) {
1✔
1385
        return;
1✔
1386
      }
1387
      updateSubchannelPicker(newPicker);
1✔
1388
      // It's not appropriate to report SHUTDOWN state from lb.
1389
      // Ignore the case of newState == SHUTDOWN for now.
1390
      if (newState != SHUTDOWN) {
1✔
1391
        channelLogger.log(
1✔
1392
            ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1393
        channelStateManager.gotoState(newState);
1✔
1394
      }
1395
    }
1✔
1396

1397
    @Override
1398
    public void refreshNameResolution() {
1399
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1400
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1401
        @Override
1402
        public void run() {
1403
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1404
        }
1✔
1405
      }
1406

1407
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1408
    }
1✔
1409

1410
    @Override
1411
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1412
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1413
    }
1414

1415
    @Override
1416
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1417
        String authority) {
1418
      // TODO(ejona): can we be even stricter? Like terminating?
1419
      checkState(!terminated, "Channel is terminated");
1✔
1420
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1421
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1422
      InternalLogId subchannelLogId =
1✔
1423
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1424
      ChannelTracer oobChannelTracer =
1✔
1425
          new ChannelTracer(
1426
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1427
              "OobChannel for " + addressGroup);
1428
      final OobChannel oobChannel = new OobChannel(
1✔
1429
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1430
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1431
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1432
          .setDescription("Child OobChannel created")
1✔
1433
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1434
          .setTimestampNanos(oobChannelCreationTime)
1✔
1435
          .setChannelRef(oobChannel)
1✔
1436
          .build());
1✔
1437
      ChannelTracer subchannelTracer =
1✔
1438
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1439
              "Subchannel for " + addressGroup);
1440
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1441
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1442
        @Override
1443
        void onTerminated(InternalSubchannel is) {
1444
          oobChannels.remove(oobChannel);
1✔
1445
          channelz.removeSubchannel(is);
1✔
1446
          oobChannel.handleSubchannelTerminated();
1✔
1447
          maybeTerminateChannel();
1✔
1448
        }
1✔
1449

1450
        @Override
1451
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1452
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1453
          //  state and refresh name resolution if necessary.
1454
          handleInternalSubchannelState(newState);
1✔
1455
          oobChannel.handleSubchannelStateChange(newState);
1✔
1456
        }
1✔
1457
      }
1458

1459
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1460
          CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
1✔
1461
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1462
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1463
          // All callback methods are run from syncContext
1464
          new ManagedOobChannelCallback(),
1465
          channelz,
1✔
1466
          callTracerFactory.create(),
1✔
1467
          subchannelTracer,
1468
          subchannelLogId,
1469
          subchannelLogger,
1470
          transportFilters);
1✔
1471
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1472
          .setDescription("Child Subchannel created")
1✔
1473
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1474
          .setTimestampNanos(oobChannelCreationTime)
1✔
1475
          .setSubchannelRef(internalSubchannel)
1✔
1476
          .build());
1✔
1477
      channelz.addSubchannel(oobChannel);
1✔
1478
      channelz.addSubchannel(internalSubchannel);
1✔
1479
      oobChannel.setSubchannel(internalSubchannel);
1✔
1480
      final class AddOobChannel implements Runnable {
1✔
1481
        @Override
1482
        public void run() {
1483
          if (terminating) {
1✔
1484
            oobChannel.shutdown();
×
1485
          }
1486
          if (!terminated) {
1✔
1487
            // If channel has not terminated, it will track the subchannel and block termination
1488
            // for it.
1489
            oobChannels.add(oobChannel);
1✔
1490
          }
1491
        }
1✔
1492
      }
1493

1494
      syncContext.execute(new AddOobChannel());
1✔
1495
      return oobChannel;
1✔
1496
    }
1497

1498
    @Deprecated
1499
    @Override
1500
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1501
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1502
          // Override authority to keep the old behavior.
1503
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1504
          .overrideAuthority(getAuthority());
1✔
1505
    }
1506

1507
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1508
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1509
    @Override
1510
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1511
        final String target, final ChannelCredentials channelCreds) {
1512
      checkNotNull(channelCreds, "channelCreds");
1✔
1513

1514
      final class ResolvingOobChannelBuilder
1515
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1516
        final ManagedChannelBuilder<?> delegate;
1517

1518
        ResolvingOobChannelBuilder() {
1✔
1519
          final ClientTransportFactory transportFactory;
1520
          CallCredentials callCredentials;
1521
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1522
            transportFactory = originalTransportFactory;
1✔
1523
            callCredentials = null;
1✔
1524
          } else {
1525
            SwapChannelCredentialsResult swapResult =
1✔
1526
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1527
            if (swapResult == null) {
1✔
1528
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1529
              return;
×
1530
            } else {
1531
              transportFactory = swapResult.transportFactory;
1✔
1532
              callCredentials = swapResult.callCredentials;
1✔
1533
            }
1534
          }
1535
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1536
              new ClientTransportFactoryBuilder() {
1✔
1537
                @Override
1538
                public ClientTransportFactory buildClientTransportFactory() {
1539
                  return transportFactory;
1✔
1540
                }
1541
              };
1542
          delegate = new ManagedChannelImplBuilder(
1✔
1543
              target,
1544
              channelCreds,
1545
              callCredentials,
1546
              transportFactoryBuilder,
1547
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1548
              .nameResolverRegistry(nameResolverRegistry);
1✔
1549
        }
1✔
1550

1551
        @Override
1552
        protected ManagedChannelBuilder<?> delegate() {
1553
          return delegate;
1✔
1554
        }
1555
      }
1556

1557
      checkState(!terminated, "Channel is terminated");
1✔
1558

1559
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1560

1561
      return builder
1✔
1562
          // TODO(zdapeng): executors should not outlive the parent channel.
1563
          .executor(executor)
1✔
1564
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1565
          .maxTraceEvents(maxTraceEvents)
1✔
1566
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1567
          .userAgent(userAgent);
1✔
1568
    }
1569

1570
    @Override
1571
    public ChannelCredentials getUnsafeChannelCredentials() {
1572
      if (originalChannelCreds == null) {
1✔
1573
        return new DefaultChannelCreds();
1✔
1574
      }
1575
      return originalChannelCreds;
×
1576
    }
1577

1578
    @Override
1579
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1580
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1581
    }
×
1582

1583
    @Override
1584
    public void updateOobChannelAddresses(ManagedChannel channel,
1585
        List<EquivalentAddressGroup> eag) {
1586
      checkArgument(channel instanceof OobChannel,
1✔
1587
          "channel must have been returned from createOobChannel");
1588
      ((OobChannel) channel).updateAddresses(eag);
1✔
1589
    }
1✔
1590

1591
    @Override
1592
    public String getAuthority() {
1593
      return ManagedChannelImpl.this.authority();
1✔
1594
    }
1595

1596
    @Override
1597
    public String getChannelTarget() {
1598
      return targetUri.toString();
1✔
1599
    }
1600

1601
    @Override
1602
    public SynchronizationContext getSynchronizationContext() {
1603
      return syncContext;
1✔
1604
    }
1605

1606
    @Override
1607
    public ScheduledExecutorService getScheduledExecutorService() {
1608
      return scheduledExecutor;
1✔
1609
    }
1610

1611
    @Override
1612
    public ChannelLogger getChannelLogger() {
1613
      return channelLogger;
1✔
1614
    }
1615

1616
    @Override
1617
    public NameResolver.Args getNameResolverArgs() {
1618
      return nameResolverArgs;
1✔
1619
    }
1620

1621
    @Override
1622
    public NameResolverRegistry getNameResolverRegistry() {
1623
      return nameResolverRegistry;
1✔
1624
    }
1625

1626
    @Override
1627
    public MetricRecorder getMetricRecorder() {
1628
      return metricRecorder;
1✔
1629
    }
1630

1631
    /**
1632
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1633
     */
1634
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1635
    //     channel creds.
1636
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1637
      @Override
1638
      public ChannelCredentials withoutBearerTokens() {
1639
        return this;
×
1640
      }
1641
    }
1642
  }
1643

1644
  final class NameResolverListener extends NameResolver.Listener2 {
1645
    final LbHelperImpl helper;
1646
    final NameResolver resolver;
1647

1648
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1649
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1650
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1651
    }
1✔
1652

1653
    @Override
1654
    public void onResult(final ResolutionResult resolutionResult) {
1655
      syncContext.execute(() -> onResult2(resolutionResult));
×
1656
    }
×
1657

1658
    @SuppressWarnings("ReferenceEquality")
1659
    @Override
1660
    public Status onResult2(final ResolutionResult resolutionResult) {
1661
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1662
      if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1663
        return Status.OK;
1✔
1664
      }
1665

1666
      StatusOr<List<EquivalentAddressGroup>> serversOrError =
1✔
1667
          resolutionResult.getAddressesOrError();
1✔
1668
      if (!serversOrError.hasValue()) {
1✔
1669
        handleErrorInSyncContext(serversOrError.getStatus());
1✔
1670
        return serversOrError.getStatus();
1✔
1671
      }
1672
      List<EquivalentAddressGroup> servers = serversOrError.getValue();
1✔
1673
      channelLogger.log(
1✔
1674
          ChannelLogLevel.DEBUG,
1675
          "Resolved address: {0}, config={1}",
1676
          servers,
1677
          resolutionResult.getAttributes());
1✔
1678

1679
      if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1680
        channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}",
1✔
1681
            servers);
1682
        lastResolutionState = ResolutionState.SUCCESS;
1✔
1683
      }
1684
      ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1685
      InternalConfigSelector resolvedConfigSelector =
1✔
1686
          resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1687
      ManagedChannelServiceConfig validServiceConfig =
1688
          configOrError != null && configOrError.getConfig() != null
1✔
1689
              ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1690
              : null;
1✔
1691
      Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1692

1693
      ManagedChannelServiceConfig effectiveServiceConfig;
1694
      if (!lookUpServiceConfig) {
1✔
1695
        if (validServiceConfig != null) {
1✔
1696
          channelLogger.log(
1✔
1697
              ChannelLogLevel.INFO,
1698
              "Service config from name resolver discarded by channel settings");
1699
        }
1700
        effectiveServiceConfig =
1701
            defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1702
        if (resolvedConfigSelector != null) {
1✔
1703
          channelLogger.log(
1✔
1704
              ChannelLogLevel.INFO,
1705
              "Config selector from name resolver discarded by channel settings");
1706
        }
1707
        realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1708
      } else {
1709
        // Try to use config if returned from name resolver
1710
        // Otherwise, try to use the default config if available
1711
        if (validServiceConfig != null) {
1✔
1712
          effectiveServiceConfig = validServiceConfig;
1✔
1713
          if (resolvedConfigSelector != null) {
1✔
1714
            realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1715
            if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1716
              channelLogger.log(
×
1717
                  ChannelLogLevel.DEBUG,
1718
                  "Method configs in service config will be discarded due to presence of"
1719
                      + "config-selector");
1720
            }
1721
          } else {
1722
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1723
          }
1724
        } else if (defaultServiceConfig != null) {
1✔
1725
          effectiveServiceConfig = defaultServiceConfig;
1✔
1726
          realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1727
          channelLogger.log(
1✔
1728
              ChannelLogLevel.INFO,
1729
              "Received no service config, using default service config");
1730
        } else if (serviceConfigError != null) {
1✔
1731
          if (!serviceConfigUpdated) {
1✔
1732
            // First DNS lookup has invalid service config, and cannot fall back to default
1733
            channelLogger.log(
1✔
1734
                ChannelLogLevel.INFO,
1735
                "Fallback to error due to invalid first service config without default config");
1736
            // This error could be an "inappropriate" control plane error that should not bleed
1737
            // through to client code using gRPC. We let them flow through here to the LB as
1738
            // we later check for these error codes when investigating pick results in
1739
            // GrpcUtil.getTransportFromPickResult().
1740
            onError(configOrError.getError());
1✔
1741
            return configOrError.getError();
1✔
1742
          } else {
1743
            effectiveServiceConfig = lastServiceConfig;
1✔
1744
          }
1745
        } else {
1746
          effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1747
          realChannel.updateConfigSelector(null);
1✔
1748
        }
1749
        if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1750
          channelLogger.log(
1✔
1751
              ChannelLogLevel.INFO,
1752
              "Service config changed{0}",
1753
              effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1754
          lastServiceConfig = effectiveServiceConfig;
1✔
1755
          transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1756
        }
1757

1758
        try {
1759
          // TODO(creamsoup): when `serversOrError` is empty and lastResolutionStateCopy == SUCCESS
1760
          //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1761
          //  lbNeedAddress is not deterministic
1762
          serviceConfigUpdated = true;
1✔
1763
        } catch (RuntimeException re) {
×
1764
          logger.log(
×
1765
              Level.WARNING,
1766
              "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1767
              re);
1768
        }
1✔
1769
      }
1770

1771
      Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1772
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1773
      if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1774
        Attributes.Builder attrBuilder =
1✔
1775
            effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1776
        Map<String, ?> healthCheckingConfig =
1✔
1777
            effectiveServiceConfig.getHealthCheckingConfig();
1✔
1778
        if (healthCheckingConfig != null) {
1✔
1779
          attrBuilder
1✔
1780
              .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1781
              .build();
1✔
1782
        }
1783
        Attributes attributes = attrBuilder.build();
1✔
1784

1785
        ResolvedAddresses.Builder resolvedAddresses = ResolvedAddresses.newBuilder()
1✔
1786
            .setAddresses(serversOrError.getValue())
1✔
1787
            .setAttributes(attributes)
1✔
1788
            .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig());
1✔
1789
        Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1790
            resolvedAddresses.build());
1✔
1791
        return addressAcceptanceStatus;
1✔
1792
      }
1793
      return Status.OK;
×
1794
    }
1795

1796
    @Override
1797
    public void onError(final Status error) {
1798
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1799
      final class NameResolverErrorHandler implements Runnable {
1✔
1800
        @Override
1801
        public void run() {
1802
          handleErrorInSyncContext(error);
1✔
1803
        }
1✔
1804
      }
1805

1806
      syncContext.execute(new NameResolverErrorHandler());
1✔
1807
    }
1✔
1808

1809
    private void handleErrorInSyncContext(Status error) {
1810
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1811
          new Object[] {getLogId(), error});
1✔
1812
      realChannel.onConfigError();
1✔
1813
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1814
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1815
        lastResolutionState = ResolutionState.ERROR;
1✔
1816
      }
1817
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1818
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1819
        return;
1✔
1820
      }
1821

1822
      helper.lb.handleNameResolutionError(error);
1✔
1823
    }
1✔
1824
  }
1825

1826
  private final class SubchannelImpl extends AbstractSubchannel {
1827
    final CreateSubchannelArgs args;
1828
    final InternalLogId subchannelLogId;
1829
    final ChannelLoggerImpl subchannelLogger;
1830
    final ChannelTracer subchannelTracer;
1831
    List<EquivalentAddressGroup> addressGroups;
1832
    InternalSubchannel subchannel;
1833
    boolean started;
1834
    boolean shutdown;
1835
    ScheduledHandle delayedShutdownTask;
1836

1837
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1838
      checkNotNull(args, "args");
1✔
1839
      addressGroups = args.getAddresses();
1✔
1840
      if (authorityOverride != null) {
1✔
1841
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1842
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1843
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1844
      }
1845
      this.args = args;
1✔
1846
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1847
      subchannelTracer = new ChannelTracer(
1✔
1848
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1849
          "Subchannel for " + args.getAddresses());
1✔
1850
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1851
    }
1✔
1852

1853
    @Override
1854
    public void start(final SubchannelStateListener listener) {
1855
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1856
      checkState(!started, "already started");
1✔
1857
      checkState(!shutdown, "already shutdown");
1✔
1858
      checkState(!terminating, "Channel is being terminated");
1✔
1859
      started = true;
1✔
1860
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1861
        // All callbacks are run in syncContext
1862
        @Override
1863
        void onTerminated(InternalSubchannel is) {
1864
          subchannels.remove(is);
1✔
1865
          channelz.removeSubchannel(is);
1✔
1866
          maybeTerminateChannel();
1✔
1867
        }
1✔
1868

1869
        @Override
1870
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1871
          checkState(listener != null, "listener is null");
1✔
1872
          listener.onSubchannelState(newState);
1✔
1873
        }
1✔
1874

1875
        @Override
1876
        void onInUse(InternalSubchannel is) {
1877
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1878
        }
1✔
1879

1880
        @Override
1881
        void onNotInUse(InternalSubchannel is) {
1882
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1883
        }
1✔
1884
      }
1885

1886
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1887
          args,
1888
          authority(),
1✔
1889
          userAgent,
1✔
1890
          backoffPolicyProvider,
1✔
1891
          transportFactory,
1✔
1892
          transportFactory.getScheduledExecutorService(),
1✔
1893
          stopwatchSupplier,
1✔
1894
          syncContext,
1895
          new ManagedInternalSubchannelCallback(),
1896
          channelz,
1✔
1897
          callTracerFactory.create(),
1✔
1898
          subchannelTracer,
1899
          subchannelLogId,
1900
          subchannelLogger,
1901
          transportFilters);
1✔
1902

1903
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1904
          .setDescription("Child Subchannel started")
1✔
1905
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1906
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1907
          .setSubchannelRef(internalSubchannel)
1✔
1908
          .build());
1✔
1909

1910
      this.subchannel = internalSubchannel;
1✔
1911
      channelz.addSubchannel(internalSubchannel);
1✔
1912
      subchannels.add(internalSubchannel);
1✔
1913
    }
1✔
1914

1915
    @Override
1916
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1917
      checkState(started, "not started");
1✔
1918
      return subchannel;
1✔
1919
    }
1920

1921
    @Override
1922
    public void shutdown() {
1923
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1924
      if (subchannel == null) {
1✔
1925
        // start() was not successful
1926
        shutdown = true;
×
1927
        return;
×
1928
      }
1929
      if (shutdown) {
1✔
1930
        if (terminating && delayedShutdownTask != null) {
1✔
1931
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1932
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1933
          delayedShutdownTask.cancel();
×
1934
          delayedShutdownTask = null;
×
1935
          // Will fall through to the subchannel.shutdown() at the end.
1936
        } else {
1937
          return;
1✔
1938
        }
1939
      } else {
1940
        shutdown = true;
1✔
1941
      }
1942
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1943
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1944
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1945
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1946
      // shutdown of Subchannel for a few seconds here.
1947
      //
1948
      // TODO(zhangkun83): consider a better approach
1949
      // (https://github.com/grpc/grpc-java/issues/2562).
1950
      if (!terminating) {
1✔
1951
        final class ShutdownSubchannel implements Runnable {
1✔
1952
          @Override
1953
          public void run() {
1954
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
1955
          }
1✔
1956
        }
1957

1958
        delayedShutdownTask = syncContext.schedule(
1✔
1959
            new LogExceptionRunnable(new ShutdownSubchannel()),
1960
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
1961
            transportFactory.getScheduledExecutorService());
1✔
1962
        return;
1✔
1963
      }
1964
      // When terminating == true, no more real streams will be created. It's safe and also
1965
      // desirable to shutdown timely.
1966
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
1967
    }
1✔
1968

1969
    @Override
1970
    public void requestConnection() {
1971
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1972
      checkState(started, "not started");
1✔
1973
      subchannel.obtainActiveTransport();
1✔
1974
    }
1✔
1975

1976
    @Override
1977
    public List<EquivalentAddressGroup> getAllAddresses() {
1978
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1979
      checkState(started, "not started");
1✔
1980
      return addressGroups;
1✔
1981
    }
1982

1983
    @Override
1984
    public Attributes getAttributes() {
1985
      return args.getAttributes();
1✔
1986
    }
1987

1988
    @Override
1989
    public String toString() {
1990
      return subchannelLogId.toString();
1✔
1991
    }
1992

1993
    @Override
1994
    public Channel asChannel() {
1995
      checkState(started, "not started");
1✔
1996
      return new SubchannelChannel(
1✔
1997
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
1998
          transportFactory.getScheduledExecutorService(),
1✔
1999
          callTracerFactory.create(),
1✔
2000
          new AtomicReference<InternalConfigSelector>(null));
2001
    }
2002

2003
    @Override
2004
    public Object getInternalSubchannel() {
2005
      checkState(started, "Subchannel is not started");
1✔
2006
      return subchannel;
1✔
2007
    }
2008

2009
    @Override
2010
    public ChannelLogger getChannelLogger() {
2011
      return subchannelLogger;
1✔
2012
    }
2013

2014
    @Override
2015
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2016
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2017
      addressGroups = addrs;
1✔
2018
      if (authorityOverride != null) {
1✔
2019
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2020
      }
2021
      subchannel.updateAddresses(addrs);
1✔
2022
    }
1✔
2023

2024
    @Override
2025
    public Attributes getConnectedAddressAttributes() {
2026
      return subchannel.getConnectedAddressAttributes();
1✔
2027
    }
2028

2029
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2030
        List<EquivalentAddressGroup> eags) {
2031
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2032
      for (EquivalentAddressGroup eag : eags) {
1✔
2033
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2034
            eag.getAddresses(),
1✔
2035
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2036
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2037
      }
1✔
2038
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2039
    }
2040
  }
2041

2042
  @Override
2043
  public String toString() {
2044
    return MoreObjects.toStringHelper(this)
1✔
2045
        .add("logId", logId.getId())
1✔
2046
        .add("target", target)
1✔
2047
        .toString();
1✔
2048
  }
2049

2050
  /**
2051
   * Called from syncContext.
2052
   */
2053
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2054
    @Override
2055
    public void transportShutdown(Status s) {
2056
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2057
    }
1✔
2058

2059
    @Override
2060
    public void transportReady() {
2061
      // Don't care
2062
    }
×
2063

2064
    @Override
2065
    public Attributes filterTransport(Attributes attributes) {
2066
      return attributes;
×
2067
    }
2068

2069
    @Override
2070
    public void transportInUse(final boolean inUse) {
2071
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2072
      if (inUse) {
1✔
2073
        // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
2074
        // subchannels is in use. But we should never be in idle mode when delayed transport is in
2075
        // use.
2076
        exitIdleMode();
1✔
2077
      }
2078
    }
1✔
2079

2080
    @Override
2081
    public void transportTerminated() {
2082
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2083
      terminating = true;
1✔
2084
      shutdownNameResolverAndLoadBalancer(false);
1✔
2085
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2086
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2087
      // here.
2088
      maybeShutdownNowSubchannels();
1✔
2089
      maybeTerminateChannel();
1✔
2090
    }
1✔
2091
  }
2092

2093
  /**
2094
   * Must be accessed from syncContext.
2095
   */
2096
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2097
    @Override
2098
    protected void handleInUse() {
2099
      exitIdleMode();
1✔
2100
    }
1✔
2101

2102
    @Override
2103
    protected void handleNotInUse() {
2104
      if (shutdown.get()) {
1✔
2105
        return;
1✔
2106
      }
2107
      rescheduleIdleTimer();
1✔
2108
    }
1✔
2109
  }
2110

2111
  /**
2112
   * Lazily request for Executor from an executor pool.
2113
   * Also act as an Executor directly to simply run a cmd
2114
   */
2115
  @VisibleForTesting
2116
  static final class ExecutorHolder implements Executor {
2117
    private final ObjectPool<? extends Executor> pool;
2118
    private Executor executor;
2119

2120
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2121
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2122
    }
1✔
2123

2124
    synchronized Executor getExecutor() {
2125
      if (executor == null) {
1✔
2126
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2127
      }
2128
      return executor;
1✔
2129
    }
2130

2131
    synchronized void release() {
2132
      if (executor != null) {
1✔
2133
        executor = pool.returnObject(executor);
1✔
2134
      }
2135
    }
1✔
2136

2137
    @Override
2138
    public void execute(Runnable command) {
2139
      getExecutor().execute(command);
1✔
2140
    }
1✔
2141
  }
2142

2143
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2144
    final ScheduledExecutorService delegate;
2145

2146
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2147
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2148
    }
1✔
2149

2150
    @Override
2151
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2152
      return delegate.schedule(callable, delay, unit);
×
2153
    }
2154

2155
    @Override
2156
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2157
      return delegate.schedule(cmd, delay, unit);
1✔
2158
    }
2159

2160
    @Override
2161
    public ScheduledFuture<?> scheduleAtFixedRate(
2162
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2163
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
1✔
2164
    }
2165

2166
    @Override
2167
    public ScheduledFuture<?> scheduleWithFixedDelay(
2168
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2169
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2170
    }
2171

2172
    @Override
2173
    public boolean awaitTermination(long timeout, TimeUnit unit)
2174
        throws InterruptedException {
2175
      return delegate.awaitTermination(timeout, unit);
×
2176
    }
2177

2178
    @Override
2179
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2180
        throws InterruptedException {
2181
      return delegate.invokeAll(tasks);
×
2182
    }
2183

2184
    @Override
2185
    public <T> List<Future<T>> invokeAll(
2186
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2187
        throws InterruptedException {
2188
      return delegate.invokeAll(tasks, timeout, unit);
×
2189
    }
2190

2191
    @Override
2192
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2193
        throws InterruptedException, ExecutionException {
2194
      return delegate.invokeAny(tasks);
×
2195
    }
2196

2197
    @Override
2198
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2199
        throws InterruptedException, ExecutionException, TimeoutException {
2200
      return delegate.invokeAny(tasks, timeout, unit);
×
2201
    }
2202

2203
    @Override
2204
    public boolean isShutdown() {
2205
      return delegate.isShutdown();
×
2206
    }
2207

2208
    @Override
2209
    public boolean isTerminated() {
2210
      return delegate.isTerminated();
×
2211
    }
2212

2213
    @Override
2214
    public void shutdown() {
2215
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2216
    }
2217

2218
    @Override
2219
    public List<Runnable> shutdownNow() {
2220
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2221
    }
2222

2223
    @Override
2224
    public <T> Future<T> submit(Callable<T> task) {
2225
      return delegate.submit(task);
×
2226
    }
2227

2228
    @Override
2229
    public Future<?> submit(Runnable task) {
2230
      return delegate.submit(task);
×
2231
    }
2232

2233
    @Override
2234
    public <T> Future<T> submit(Runnable task, T result) {
2235
      return delegate.submit(task, result);
×
2236
    }
2237

2238
    @Override
2239
    public void execute(Runnable command) {
2240
      delegate.execute(command);
×
2241
    }
×
2242
  }
2243

2244
  /**
2245
   * A ResolutionState indicates the status of last name resolution.
2246
   */
2247
  enum ResolutionState {
1✔
2248
    NO_RESOLUTION,
1✔
2249
    SUCCESS,
1✔
2250
    ERROR
1✔
2251
  }
2252
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc