• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19870

17 Jun 2025 10:14PM UTC coverage: 88.543% (-0.01%) from 88.556%
#19870

push

github

ejona86
xds: Add logical dns cluster support to XdsDepManager

ClusterResolverLb gets the NameResolverRegistry from
LoadBalancer.Helper, so a new API was added in NameResover.Args to
propagate the same object to the name resolver tree.

RetryingNameResolver was exposed to xds. This is expected to be
temporary, as the retrying is being removed from ManagedChannelImpl and
moved into the resolvers. At that point, DnsNameResolverProvider would
wrap DnsNameResolver with a similar API to RetryingNameResolver and xds
would no longer be responsible.

34637 of 39119 relevant lines covered (88.54%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.41
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import com.google.errorprone.annotations.concurrent.GuardedBy;
36
import io.grpc.Attributes;
37
import io.grpc.CallCredentials;
38
import io.grpc.CallOptions;
39
import io.grpc.Channel;
40
import io.grpc.ChannelCredentials;
41
import io.grpc.ChannelLogger;
42
import io.grpc.ChannelLogger.ChannelLogLevel;
43
import io.grpc.ClientCall;
44
import io.grpc.ClientInterceptor;
45
import io.grpc.ClientInterceptors;
46
import io.grpc.ClientStreamTracer;
47
import io.grpc.ClientTransportFilter;
48
import io.grpc.CompressorRegistry;
49
import io.grpc.ConnectivityState;
50
import io.grpc.ConnectivityStateInfo;
51
import io.grpc.Context;
52
import io.grpc.Deadline;
53
import io.grpc.DecompressorRegistry;
54
import io.grpc.EquivalentAddressGroup;
55
import io.grpc.ForwardingChannelBuilder2;
56
import io.grpc.ForwardingClientCall;
57
import io.grpc.Grpc;
58
import io.grpc.InternalChannelz;
59
import io.grpc.InternalChannelz.ChannelStats;
60
import io.grpc.InternalChannelz.ChannelTrace;
61
import io.grpc.InternalConfigSelector;
62
import io.grpc.InternalInstrumented;
63
import io.grpc.InternalLogId;
64
import io.grpc.InternalWithLogId;
65
import io.grpc.LoadBalancer;
66
import io.grpc.LoadBalancer.CreateSubchannelArgs;
67
import io.grpc.LoadBalancer.PickResult;
68
import io.grpc.LoadBalancer.PickSubchannelArgs;
69
import io.grpc.LoadBalancer.ResolvedAddresses;
70
import io.grpc.LoadBalancer.SubchannelPicker;
71
import io.grpc.LoadBalancer.SubchannelStateListener;
72
import io.grpc.ManagedChannel;
73
import io.grpc.ManagedChannelBuilder;
74
import io.grpc.Metadata;
75
import io.grpc.MethodDescriptor;
76
import io.grpc.MetricInstrumentRegistry;
77
import io.grpc.MetricRecorder;
78
import io.grpc.NameResolver;
79
import io.grpc.NameResolver.ConfigOrError;
80
import io.grpc.NameResolver.ResolutionResult;
81
import io.grpc.NameResolverProvider;
82
import io.grpc.NameResolverRegistry;
83
import io.grpc.ProxyDetector;
84
import io.grpc.Status;
85
import io.grpc.StatusOr;
86
import io.grpc.SynchronizationContext;
87
import io.grpc.SynchronizationContext.ScheduledHandle;
88
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
89
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
90
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
91
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
92
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
93
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
94
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
95
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
96
import io.grpc.internal.RetriableStream.Throttle;
97
import java.net.URI;
98
import java.util.ArrayList;
99
import java.util.Collection;
100
import java.util.Collections;
101
import java.util.HashSet;
102
import java.util.LinkedHashSet;
103
import java.util.List;
104
import java.util.Map;
105
import java.util.Set;
106
import java.util.concurrent.Callable;
107
import java.util.concurrent.CountDownLatch;
108
import java.util.concurrent.ExecutionException;
109
import java.util.concurrent.Executor;
110
import java.util.concurrent.Future;
111
import java.util.concurrent.ScheduledExecutorService;
112
import java.util.concurrent.ScheduledFuture;
113
import java.util.concurrent.TimeUnit;
114
import java.util.concurrent.TimeoutException;
115
import java.util.concurrent.atomic.AtomicBoolean;
116
import java.util.concurrent.atomic.AtomicReference;
117
import java.util.logging.Level;
118
import java.util.logging.Logger;
119
import javax.annotation.Nullable;
120
import javax.annotation.concurrent.ThreadSafe;
121

122
/** A communication channel for making outgoing RPCs. */
123
@ThreadSafe
124
final class ManagedChannelImpl extends ManagedChannel implements
125
    InternalInstrumented<ChannelStats> {
126
  @VisibleForTesting
127
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
128

129
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
130

131
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
132

133
  @VisibleForTesting
134
  static final Status SHUTDOWN_NOW_STATUS =
1✔
135
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
136

137
  @VisibleForTesting
138
  static final Status SHUTDOWN_STATUS =
1✔
139
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
140

141
  @VisibleForTesting
142
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
143
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
144

145
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
146
      ManagedChannelServiceConfig.empty();
1✔
147
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
148
      new InternalConfigSelector() {
1✔
149
        @Override
150
        public Result selectConfig(PickSubchannelArgs args) {
151
          throw new IllegalStateException("Resolution is pending");
×
152
        }
153
      };
154
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
155
      new LoadBalancer.PickDetailsConsumer() {};
1✔
156

157
  private final InternalLogId logId;
158
  private final String target;
159
  @Nullable
160
  private final String authorityOverride;
161
  private final NameResolverRegistry nameResolverRegistry;
162
  private final URI targetUri;
163
  private final NameResolverProvider nameResolverProvider;
164
  private final NameResolver.Args nameResolverArgs;
165
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
166
  private final ClientTransportFactory originalTransportFactory;
167
  @Nullable
168
  private final ChannelCredentials originalChannelCreds;
169
  private final ClientTransportFactory transportFactory;
170
  private final ClientTransportFactory oobTransportFactory;
171
  private final RestrictedScheduledExecutor scheduledExecutor;
172
  private final Executor executor;
173
  private final ObjectPool<? extends Executor> executorPool;
174
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
175
  private final ExecutorHolder balancerRpcExecutorHolder;
176
  private final ExecutorHolder offloadExecutorHolder;
177
  private final TimeProvider timeProvider;
178
  private final int maxTraceEvents;
179

180
  @VisibleForTesting
1✔
181
  final SynchronizationContext syncContext = new SynchronizationContext(
182
      new Thread.UncaughtExceptionHandler() {
1✔
183
        @Override
184
        public void uncaughtException(Thread t, Throwable e) {
185
          logger.log(
1✔
186
              Level.SEVERE,
187
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
188
              e);
189
          try {
190
            panic(e);
1✔
191
          } catch (Throwable anotherT) {
×
192
            logger.log(
×
193
                Level.SEVERE, "[" + getLogId() + "] Uncaught exception while panicking", anotherT);
×
194
          }
1✔
195
        }
1✔
196
      });
197

198
  private boolean fullStreamDecompression;
199

200
  private final DecompressorRegistry decompressorRegistry;
201
  private final CompressorRegistry compressorRegistry;
202

203
  private final Supplier<Stopwatch> stopwatchSupplier;
204
  /** The timeout before entering idle mode. */
205
  private final long idleTimeoutMillis;
206

207
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
208
  private final BackoffPolicy.Provider backoffPolicyProvider;
209

210
  /**
211
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
212
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
213
   * {@link RealChannel}.
214
   */
215
  private final Channel interceptorChannel;
216

217
  private final List<ClientTransportFilter> transportFilters;
218
  @Nullable private final String userAgent;
219

220
  // Only null after channel is terminated. Must be assigned from the syncContext.
221
  private NameResolver nameResolver;
222

223
  // Must be accessed from the syncContext.
224
  private boolean nameResolverStarted;
225

226
  // null when channel is in idle mode.  Must be assigned from syncContext.
227
  @Nullable
228
  private LbHelperImpl lbHelper;
229

230
  // Must be accessed from the syncContext
231
  private boolean panicMode;
232

233
  // Must be mutated from syncContext
234
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
235
  // switch to a ConcurrentHashMap.
236
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
237

238
  // Must be accessed from syncContext
239
  @Nullable
240
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
241
  private final Object pendingCallsInUseObject = new Object();
1✔
242

243
  // Must be mutated from syncContext
244
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
245

246
  // reprocess() must be run from syncContext
247
  private final DelayedClientTransport delayedTransport;
248
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
249
      = new UncommittedRetriableStreamsRegistry();
250

251
  // Shutdown states.
252
  //
253
  // Channel's shutdown process:
254
  // 1. shutdown(): stop accepting new calls from applications
255
  //   1a shutdown <- true
256
  //   1b delayedTransport.shutdown()
257
  // 2. delayedTransport terminated: stop stream-creation functionality
258
  //   2a terminating <- true
259
  //   2b loadBalancer.shutdown()
260
  //     * LoadBalancer will shutdown subchannels and OOB channels
261
  //   2c loadBalancer <- null
262
  //   2d nameResolver.shutdown()
263
  //   2e nameResolver <- null
264
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
265

266
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
267
  // Must only be mutated and read from syncContext
268
  private boolean shutdownNowed;
269
  // Must only be mutated from syncContext
270
  private boolean terminating;
271
  // Must be mutated from syncContext
272
  private volatile boolean terminated;
273
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
274

275
  private final CallTracer.Factory callTracerFactory;
276
  private final CallTracer channelCallTracer;
277
  private final ChannelTracer channelTracer;
278
  private final ChannelLogger channelLogger;
279
  private final InternalChannelz channelz;
280
  private final RealChannel realChannel;
281
  // Must be mutated and read from syncContext
282
  // a flag for doing channel tracing when flipped
283
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
284
  // Must be mutated and read from constructor or syncContext
285
  // used for channel tracing when value changed
286
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
287

288
  @Nullable
289
  private final ManagedChannelServiceConfig defaultServiceConfig;
290
  // Must be mutated and read from constructor or syncContext
291
  private boolean serviceConfigUpdated = false;
1✔
292
  private final boolean lookUpServiceConfig;
293

294
  // One instance per channel.
295
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
296

297
  private final long perRpcBufferLimit;
298
  private final long channelBufferLimit;
299

300
  // Temporary false flag that can skip the retry code path.
301
  private final boolean retryEnabled;
302

303
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
304

305
  // Called from syncContext
306
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
307
      new DelayedTransportListener();
308

309
  // Must be called from syncContext
310
  private void maybeShutdownNowSubchannels() {
311
    if (shutdownNowed) {
1✔
312
      for (InternalSubchannel subchannel : subchannels) {
1✔
313
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
314
      }
1✔
315
      for (OobChannel oobChannel : oobChannels) {
1✔
316
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
317
      }
1✔
318
    }
319
  }
1✔
320

321
  // Must be accessed from syncContext
322
  @VisibleForTesting
1✔
323
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
324

325
  @Override
326
  public ListenableFuture<ChannelStats> getStats() {
327
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
328
    final class StatsFetcher implements Runnable {
1✔
329
      @Override
330
      public void run() {
331
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
332
        channelCallTracer.updateBuilder(builder);
1✔
333
        channelTracer.updateBuilder(builder);
1✔
334
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
335
        List<InternalWithLogId> children = new ArrayList<>();
1✔
336
        children.addAll(subchannels);
1✔
337
        children.addAll(oobChannels);
1✔
338
        builder.setSubchannels(children);
1✔
339
        ret.set(builder.build());
1✔
340
      }
1✔
341
    }
342

343
    // subchannels and oobchannels can only be accessed from syncContext
344
    syncContext.execute(new StatsFetcher());
1✔
345
    return ret;
1✔
346
  }
347

348
  @Override
349
  public InternalLogId getLogId() {
350
    return logId;
1✔
351
  }
352

353
  // Run from syncContext
354
  private class IdleModeTimer implements Runnable {
1✔
355

356
    @Override
357
    public void run() {
358
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
359
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
360
      // subtle to change rapidly to resolve the channel panic. See #8714
361
      if (lbHelper == null) {
1✔
362
        return;
×
363
      }
364
      enterIdleMode();
1✔
365
    }
1✔
366
  }
367

368
  // Must be called from syncContext
369
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
370
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
371
    if (channelIsActive) {
1✔
372
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
373
      checkState(lbHelper != null, "lbHelper is null");
1✔
374
    }
375
    if (nameResolver != null) {
1✔
376
      nameResolver.shutdown();
1✔
377
      nameResolverStarted = false;
1✔
378
      if (channelIsActive) {
1✔
379
        nameResolver = getNameResolver(
1✔
380
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
381
      } else {
382
        nameResolver = null;
1✔
383
      }
384
    }
385
    if (lbHelper != null) {
1✔
386
      lbHelper.lb.shutdown();
1✔
387
      lbHelper = null;
1✔
388
    }
389
  }
1✔
390

391
  /**
392
   * Make the channel exit idle mode, if it's in it.
393
   *
394
   * <p>Must be called from syncContext
395
   */
396
  @VisibleForTesting
397
  void exitIdleMode() {
398
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
399
    if (shutdown.get() || panicMode) {
1✔
400
      return;
1✔
401
    }
402
    if (inUseStateAggregator.isInUse()) {
1✔
403
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
404
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
405
      cancelIdleTimer(false);
1✔
406
    } else {
407
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
408
      // isInUse() == false, in which case we still need to schedule the timer.
409
      rescheduleIdleTimer();
1✔
410
    }
411
    if (lbHelper != null) {
1✔
412
      return;
1✔
413
    }
414
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
415
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
416
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
417
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
418
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
419
    this.lbHelper = lbHelper;
1✔
420

421
    channelStateManager.gotoState(CONNECTING);
1✔
422
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
423
    nameResolver.start(listener);
1✔
424
    nameResolverStarted = true;
1✔
425
  }
1✔
426

427
  // Must be run from syncContext
428
  private void enterIdleMode() {
429
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
430
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
431
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
432
    // which are bugs.
433
    shutdownNameResolverAndLoadBalancer(true);
1✔
434
    delayedTransport.reprocess(null);
1✔
435
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
436
    channelStateManager.gotoState(IDLE);
1✔
437
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
438
    // transport to be holding some we need to exit idle mode to give these calls a chance to
439
    // be processed.
440
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
441
      exitIdleMode();
1✔
442
    }
443
  }
1✔
444

445
  // Must be run from syncContext
446
  private void cancelIdleTimer(boolean permanent) {
447
    idleTimer.cancel(permanent);
1✔
448
  }
1✔
449

450
  // Always run from syncContext
451
  private void rescheduleIdleTimer() {
452
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
453
      return;
1✔
454
    }
455
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
456
  }
1✔
457

458
  /**
459
   * Force name resolution refresh to happen immediately. Must be run
460
   * from syncContext.
461
   */
462
  private void refreshNameResolution() {
463
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
464
    if (nameResolverStarted) {
1✔
465
      nameResolver.refresh();
1✔
466
    }
467
  }
1✔
468

469
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
470
    volatile Throttle throttle;
471

472
    @Override
473
    public ClientStream newStream(
474
        final MethodDescriptor<?, ?> method,
475
        final CallOptions callOptions,
476
        final Metadata headers,
477
        final Context context) {
478
      // There is no need to reschedule the idle timer here. If the channel isn't shut down, either
479
      // the delayed transport or a real transport will go in-use and cancel the idle timer.
480
      if (!retryEnabled) {
1✔
481
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
482
            callOptions, headers, 0, /* isTransparentRetry= */ false);
483
        Context origContext = context.attach();
1✔
484
        try {
485
          return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
486
        } finally {
487
          context.detach(origContext);
1✔
488
        }
489
      } else {
490
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
491
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
492
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
493
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
494
          @SuppressWarnings("unchecked")
495
          RetryStream() {
1✔
496
            super(
1✔
497
                (MethodDescriptor<ReqT, ?>) method,
498
                headers,
499
                channelBufferUsed,
1✔
500
                perRpcBufferLimit,
1✔
501
                channelBufferLimit,
1✔
502
                getCallExecutor(callOptions),
1✔
503
                transportFactory.getScheduledExecutorService(),
1✔
504
                retryPolicy,
505
                hedgingPolicy,
506
                throttle);
507
          }
1✔
508

509
          @Override
510
          Status prestart() {
511
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
512
          }
513

514
          @Override
515
          void postCommit() {
516
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
517
          }
1✔
518

519
          @Override
520
          ClientStream newSubstream(
521
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
522
              boolean isTransparentRetry) {
523
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
524
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
525
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
526
            Context origContext = context.attach();
1✔
527
            try {
528
              return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
1✔
529
            } finally {
530
              context.detach(origContext);
1✔
531
            }
532
          }
533
        }
534

535
        return new RetryStream<>();
1✔
536
      }
537
    }
538
  }
539

540
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
541

542
  private final Rescheduler idleTimer;
543
  private final MetricRecorder metricRecorder;
544

545
  ManagedChannelImpl(
546
      ManagedChannelImplBuilder builder,
547
      ClientTransportFactory clientTransportFactory,
548
      URI targetUri,
549
      NameResolverProvider nameResolverProvider,
550
      BackoffPolicy.Provider backoffPolicyProvider,
551
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
552
      Supplier<Stopwatch> stopwatchSupplier,
553
      List<ClientInterceptor> interceptors,
554
      final TimeProvider timeProvider) {
1✔
555
    this.target = checkNotNull(builder.target, "target");
1✔
556
    this.logId = InternalLogId.allocate("Channel", target);
1✔
557
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
558
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
559
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
560
    this.originalChannelCreds = builder.channelCredentials;
1✔
561
    this.originalTransportFactory = clientTransportFactory;
1✔
562
    this.offloadExecutorHolder =
1✔
563
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
564
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
565
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
566
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
567
        clientTransportFactory, null, this.offloadExecutorHolder);
568
    this.scheduledExecutor =
1✔
569
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
570
    maxTraceEvents = builder.maxTraceEvents;
1✔
571
    channelTracer = new ChannelTracer(
1✔
572
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
573
        "Channel for '" + target + "'");
574
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
575
    ProxyDetector proxyDetector =
576
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
577
    this.retryEnabled = builder.retryEnabled;
1✔
578
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
579
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
580
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
581
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
582
    ScParser serviceConfigParser =
1✔
583
        new ScParser(
584
            retryEnabled,
585
            builder.maxRetryAttempts,
586
            builder.maxHedgedAttempts,
587
            loadBalancerFactory);
588
    this.authorityOverride = builder.authorityOverride;
1✔
589
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
590
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
591
    NameResolver.Args.Builder nameResolverArgsBuilder = NameResolver.Args.newBuilder()
1✔
592
            .setDefaultPort(builder.getDefaultPort())
1✔
593
            .setProxyDetector(proxyDetector)
1✔
594
            .setSynchronizationContext(syncContext)
1✔
595
            .setScheduledExecutorService(scheduledExecutor)
1✔
596
            .setServiceConfigParser(serviceConfigParser)
1✔
597
            .setChannelLogger(channelLogger)
1✔
598
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
599
            .setOverrideAuthority(this.authorityOverride)
1✔
600
            .setMetricRecorder(this.metricRecorder)
1✔
601
            .setNameResolverRegistry(builder.nameResolverRegistry);
1✔
602
    builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
1✔
603
    this.nameResolverArgs = nameResolverArgsBuilder.build();
1✔
604
    this.nameResolver = getNameResolver(
1✔
605
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
606
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
607
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
608
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
609
    this.delayedTransport.start(delayedTransportListener);
1✔
610
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
611

612
    if (builder.defaultServiceConfig != null) {
1✔
613
      ConfigOrError parsedDefaultServiceConfig =
1✔
614
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
615
      checkState(
1✔
616
          parsedDefaultServiceConfig.getError() == null,
1✔
617
          "Default config is invalid: %s",
618
          parsedDefaultServiceConfig.getError());
1✔
619
      this.defaultServiceConfig =
1✔
620
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
621
      this.transportProvider.throttle = this.defaultServiceConfig.getRetryThrottling();
1✔
622
    } else {
1✔
623
      this.defaultServiceConfig = null;
1✔
624
    }
625
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
626
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
627
    Channel channel = realChannel;
1✔
628
    if (builder.binlog != null) {
1✔
629
      channel = builder.binlog.wrapChannel(channel);
1✔
630
    }
631
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
632
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
633
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
634
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
635
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
636
    } else {
637
      checkArgument(
1✔
638
          builder.idleTimeoutMillis
639
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
640
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
641
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
642
    }
643

644
    idleTimer = new Rescheduler(
1✔
645
        new IdleModeTimer(),
646
        syncContext,
647
        transportFactory.getScheduledExecutorService(),
1✔
648
        stopwatchSupplier.get());
1✔
649
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
650
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
651
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
652
    this.userAgent = builder.userAgent;
1✔
653

654
    this.channelBufferLimit = builder.retryBufferSize;
1✔
655
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
656
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
657
      @Override
658
      public CallTracer create() {
659
        return new CallTracer(timeProvider);
1✔
660
      }
661
    }
662

663
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
664
    channelCallTracer = callTracerFactory.create();
1✔
665
    this.channelz = checkNotNull(builder.channelz);
1✔
666
    channelz.addRootChannel(this);
1✔
667

668
    if (!lookUpServiceConfig) {
1✔
669
      if (defaultServiceConfig != null) {
1✔
670
        channelLogger.log(
1✔
671
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
672
      }
673
      serviceConfigUpdated = true;
1✔
674
    }
675
  }
1✔
676

677
  @VisibleForTesting
678
  static NameResolver getNameResolver(
679
      URI targetUri, @Nullable final String overrideAuthority,
680
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
681
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
682
    if (resolver == null) {
1✔
683
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
684
    }
685

686
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
687
    // TODO: After a transition period, all NameResolver implementations that need retry should use
688
    //       RetryingNameResolver directly and this step can be removed.
689
    NameResolver usedNameResolver = RetryingNameResolver.wrap(resolver, nameResolverArgs);
1✔
690

691
    if (overrideAuthority == null) {
1✔
692
      return usedNameResolver;
1✔
693
    }
694

695
    return new ForwardingNameResolver(usedNameResolver) {
1✔
696
      @Override
697
      public String getServiceAuthority() {
698
        return overrideAuthority;
1✔
699
      }
700
    };
701
  }
702

703
  @VisibleForTesting
704
  InternalConfigSelector getConfigSelector() {
705
    return realChannel.configSelector.get();
1✔
706
  }
707
  
708
  @VisibleForTesting
709
  boolean hasThrottle() {
710
    return this.transportProvider.throttle != null;
1✔
711
  }
712

713
  /**
714
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
715
   * cancelled.
716
   */
717
  @Override
718
  public ManagedChannelImpl shutdown() {
719
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
720
    if (!shutdown.compareAndSet(false, true)) {
1✔
721
      return this;
1✔
722
    }
723
    final class Shutdown implements Runnable {
1✔
724
      @Override
725
      public void run() {
726
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
727
        channelStateManager.gotoState(SHUTDOWN);
1✔
728
      }
1✔
729
    }
730

731
    syncContext.execute(new Shutdown());
1✔
732
    realChannel.shutdown();
1✔
733
    final class CancelIdleTimer implements Runnable {
1✔
734
      @Override
735
      public void run() {
736
        cancelIdleTimer(/* permanent= */ true);
1✔
737
      }
1✔
738
    }
739

740
    syncContext.execute(new CancelIdleTimer());
1✔
741
    return this;
1✔
742
  }
743

744
  /**
745
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
746
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
747
   * return {@code false} immediately after this method returns.
748
   */
749
  @Override
750
  public ManagedChannelImpl shutdownNow() {
751
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
752
    shutdown();
1✔
753
    realChannel.shutdownNow();
1✔
754
    final class ShutdownNow implements Runnable {
1✔
755
      @Override
756
      public void run() {
757
        if (shutdownNowed) {
1✔
758
          return;
1✔
759
        }
760
        shutdownNowed = true;
1✔
761
        maybeShutdownNowSubchannels();
1✔
762
      }
1✔
763
    }
764

765
    syncContext.execute(new ShutdownNow());
1✔
766
    return this;
1✔
767
  }
768

769
  // Called from syncContext
770
  @VisibleForTesting
771
  void panic(final Throwable t) {
772
    if (panicMode) {
1✔
773
      // Preserve the first panic information
774
      return;
×
775
    }
776
    panicMode = true;
1✔
777
    try {
778
      cancelIdleTimer(/* permanent= */ true);
1✔
779
      shutdownNameResolverAndLoadBalancer(false);
1✔
780
    } finally {
781
      updateSubchannelPicker(new LoadBalancer.FixedResultPicker(PickResult.withDrop(
1✔
782
          Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t))));
1✔
783
      realChannel.updateConfigSelector(null);
1✔
784
      channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
785
      channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
786
    }
787
  }
1✔
788

789
  @VisibleForTesting
790
  boolean isInPanicMode() {
791
    return panicMode;
1✔
792
  }
793

794
  // Called from syncContext
795
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
796
    delayedTransport.reprocess(newPicker);
1✔
797
  }
1✔
798

799
  @Override
800
  public boolean isShutdown() {
801
    return shutdown.get();
1✔
802
  }
803

804
  @Override
805
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
806
    return terminatedLatch.await(timeout, unit);
1✔
807
  }
808

809
  @Override
810
  public boolean isTerminated() {
811
    return terminated;
1✔
812
  }
813

814
  /*
815
   * Creates a new outgoing call on the channel.
816
   */
817
  @Override
818
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
819
      CallOptions callOptions) {
820
    return interceptorChannel.newCall(method, callOptions);
1✔
821
  }
822

823
  @Override
824
  public String authority() {
825
    return interceptorChannel.authority();
1✔
826
  }
827

828
  private Executor getCallExecutor(CallOptions callOptions) {
829
    Executor executor = callOptions.getExecutor();
1✔
830
    if (executor == null) {
1✔
831
      executor = this.executor;
1✔
832
    }
833
    return executor;
1✔
834
  }
835

836
  private class RealChannel extends Channel {
837
    // Reference to null if no config selector is available from resolution result
838
    // Reference must be set() from syncContext
839
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
840
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
841
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
842
    // same target, the new instance must have the same value.
843
    private final String authority;
844

845
    private final Channel clientCallImplChannel = new Channel() {
1✔
846
      @Override
847
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
848
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
849
        return new ClientCallImpl<>(
1✔
850
            method,
851
            getCallExecutor(callOptions),
1✔
852
            callOptions,
853
            transportProvider,
1✔
854
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
855
            channelCallTracer,
1✔
856
            null)
857
            .setFullStreamDecompression(fullStreamDecompression)
1✔
858
            .setDecompressorRegistry(decompressorRegistry)
1✔
859
            .setCompressorRegistry(compressorRegistry);
1✔
860
      }
861

862
      @Override
863
      public String authority() {
864
        return authority;
×
865
      }
866
    };
867

868
    private RealChannel(String authority) {
1✔
869
      this.authority =  checkNotNull(authority, "authority");
1✔
870
    }
1✔
871

872
    @Override
873
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
874
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
875
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
876
        return newClientCall(method, callOptions);
1✔
877
      }
878
      syncContext.execute(new Runnable() {
1✔
879
        @Override
880
        public void run() {
881
          exitIdleMode();
1✔
882
        }
1✔
883
      });
884
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
885
        // This is an optimization for the case (typically with InProcessTransport) when name
886
        // resolution result is immediately available at this point. Otherwise, some users'
887
        // tests might observe slight behavior difference from earlier grpc versions.
888
        return newClientCall(method, callOptions);
1✔
889
      }
890
      if (shutdown.get()) {
1✔
891
        // Return a failing ClientCall.
892
        return new ClientCall<ReqT, RespT>() {
×
893
          @Override
894
          public void start(Listener<RespT> responseListener, Metadata headers) {
895
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
896
          }
×
897

898
          @Override public void request(int numMessages) {}
×
899

900
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
901

902
          @Override public void halfClose() {}
×
903

904
          @Override public void sendMessage(ReqT message) {}
×
905
        };
906
      }
907
      Context context = Context.current();
1✔
908
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
909
      syncContext.execute(new Runnable() {
1✔
910
        @Override
911
        public void run() {
912
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
913
            if (pendingCalls == null) {
1✔
914
              pendingCalls = new LinkedHashSet<>();
1✔
915
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
916
            }
917
            pendingCalls.add(pendingCall);
1✔
918
          } else {
919
            pendingCall.reprocess();
1✔
920
          }
921
        }
1✔
922
      });
923
      return pendingCall;
1✔
924
    }
925

926
    // Must run in SynchronizationContext.
927
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
928
      InternalConfigSelector prevConfig = configSelector.get();
1✔
929
      configSelector.set(config);
1✔
930
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
931
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
932
          pendingCall.reprocess();
1✔
933
        }
1✔
934
      }
935
    }
1✔
936

937
    // Must run in SynchronizationContext.
938
    void onConfigError() {
939
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
940
        // Apply Default Service Config if initial name resolution fails.
941
        if (defaultServiceConfig != null) {
1✔
942
          updateConfigSelector(defaultServiceConfig.getDefaultConfigSelector());
1✔
943
          lastServiceConfig = defaultServiceConfig;
1✔
944
          channelLogger.log(ChannelLogLevel.ERROR,
1✔
945
              "Initial Name Resolution error, using default service config");
946
        } else {
947
          updateConfigSelector(null);
1✔
948
        }
949
      }
950
    }
1✔
951

952
    void shutdown() {
953
      final class RealChannelShutdown implements Runnable {
1✔
954
        @Override
955
        public void run() {
956
          if (pendingCalls == null) {
1✔
957
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
958
              configSelector.set(null);
1✔
959
            }
960
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
961
          }
962
        }
1✔
963
      }
964

965
      syncContext.execute(new RealChannelShutdown());
1✔
966
    }
1✔
967

968
    void shutdownNow() {
969
      final class RealChannelShutdownNow implements Runnable {
1✔
970
        @Override
971
        public void run() {
972
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
973
            configSelector.set(null);
1✔
974
          }
975
          if (pendingCalls != null) {
1✔
976
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
977
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
978
            }
1✔
979
          }
980
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
981
        }
1✔
982
      }
983

984
      syncContext.execute(new RealChannelShutdownNow());
1✔
985
    }
1✔
986

987
    @Override
988
    public String authority() {
989
      return authority;
1✔
990
    }
991

992
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
993
      final Context context;
994
      final MethodDescriptor<ReqT, RespT> method;
995
      final CallOptions callOptions;
996
      private final long callCreationTime;
997

998
      PendingCall(
999
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1000
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1001
        this.context = context;
1✔
1002
        this.method = method;
1✔
1003
        this.callOptions = callOptions;
1✔
1004
        this.callCreationTime = ticker.nanoTime();
1✔
1005
      }
1✔
1006

1007
      /** Called when it's ready to create a real call and reprocess the pending call. */
1008
      void reprocess() {
1009
        ClientCall<ReqT, RespT> realCall;
1010
        Context previous = context.attach();
1✔
1011
        try {
1012
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1013
              ticker.nanoTime() - callCreationTime);
1✔
1014
          realCall = newClientCall(method, delayResolutionOption);
1✔
1015
        } finally {
1016
          context.detach(previous);
1✔
1017
        }
1018
        Runnable toRun = setCall(realCall);
1✔
1019
        if (toRun == null) {
1✔
1020
          syncContext.execute(new PendingCallRemoval());
1✔
1021
        } else {
1022
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1023
            @Override
1024
            public void run() {
1025
              toRun.run();
1✔
1026
              syncContext.execute(new PendingCallRemoval());
1✔
1027
            }
1✔
1028
          });
1029
        }
1030
      }
1✔
1031

1032
      @Override
1033
      protected void callCancelled() {
1034
        super.callCancelled();
1✔
1035
        syncContext.execute(new PendingCallRemoval());
1✔
1036
      }
1✔
1037

1038
      final class PendingCallRemoval implements Runnable {
1✔
1039
        @Override
1040
        public void run() {
1041
          if (pendingCalls != null) {
1✔
1042
            pendingCalls.remove(PendingCall.this);
1✔
1043
            if (pendingCalls.isEmpty()) {
1✔
1044
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1045
              pendingCalls = null;
1✔
1046
              if (shutdown.get()) {
1✔
1047
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1048
              }
1049
            }
1050
          }
1051
        }
1✔
1052
      }
1053
    }
1054

1055
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1056
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1057
      InternalConfigSelector selector = configSelector.get();
1✔
1058
      if (selector == null) {
1✔
1059
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1060
      }
1061
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1062
        MethodInfo methodInfo =
1✔
1063
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1064
        if (methodInfo != null) {
1✔
1065
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1066
        }
1067
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1068
      }
1069
      return new ConfigSelectingClientCall<>(
1✔
1070
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1071
    }
1072
  }
1073

1074
  /**
1075
   * A client call for a given channel that applies a given config selector when it starts.
1076
   */
1077
  static final class ConfigSelectingClientCall<ReqT, RespT>
1078
      extends ForwardingClientCall<ReqT, RespT> {
1079

1080
    private final InternalConfigSelector configSelector;
1081
    private final Channel channel;
1082
    private final Executor callExecutor;
1083
    private final MethodDescriptor<ReqT, RespT> method;
1084
    private final Context context;
1085
    private CallOptions callOptions;
1086

1087
    private ClientCall<ReqT, RespT> delegate;
1088

1089
    ConfigSelectingClientCall(
1090
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1091
        MethodDescriptor<ReqT, RespT> method,
1092
        CallOptions callOptions) {
1✔
1093
      this.configSelector = configSelector;
1✔
1094
      this.channel = channel;
1✔
1095
      this.method = method;
1✔
1096
      this.callExecutor =
1✔
1097
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1098
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1099
      this.context = Context.current();
1✔
1100
    }
1✔
1101

1102
    @Override
1103
    protected ClientCall<ReqT, RespT> delegate() {
1104
      return delegate;
1✔
1105
    }
1106

1107
    @SuppressWarnings("unchecked")
1108
    @Override
1109
    public void start(Listener<RespT> observer, Metadata headers) {
1110
      PickSubchannelArgs args =
1✔
1111
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1112
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1113
      Status status = result.getStatus();
1✔
1114
      if (!status.isOk()) {
1✔
1115
        executeCloseObserverInContext(observer,
1✔
1116
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1117
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1118
        return;
1✔
1119
      }
1120
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1121
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1122
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1123
      if (methodInfo != null) {
1✔
1124
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1125
      }
1126
      if (interceptor != null) {
1✔
1127
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1128
      } else {
1129
        delegate = channel.newCall(method, callOptions);
×
1130
      }
1131
      delegate.start(observer, headers);
1✔
1132
    }
1✔
1133

1134
    private void executeCloseObserverInContext(
1135
        final Listener<RespT> observer, final Status status) {
1136
      class CloseInContext extends ContextRunnable {
1137
        CloseInContext() {
1✔
1138
          super(context);
1✔
1139
        }
1✔
1140

1141
        @Override
1142
        public void runInContext() {
1143
          observer.onClose(status, new Metadata());
1✔
1144
        }
1✔
1145
      }
1146

1147
      callExecutor.execute(new CloseInContext());
1✔
1148
    }
1✔
1149

1150
    @Override
1151
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1152
      if (delegate != null) {
×
1153
        delegate.cancel(message, cause);
×
1154
      }
1155
    }
×
1156
  }
1157

1158
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1159
    @Override
1160
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1161

1162
    @Override
1163
    public void request(int numMessages) {}
1✔
1164

1165
    @Override
1166
    public void cancel(String message, Throwable cause) {}
×
1167

1168
    @Override
1169
    public void halfClose() {}
×
1170

1171
    @Override
1172
    public void sendMessage(Object message) {}
×
1173

1174
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1175
    @Override
1176
    public boolean isReady() {
1177
      return false;
×
1178
    }
1179
  };
1180

1181
  /**
1182
   * Terminate the channel if termination conditions are met.
1183
   */
1184
  // Must be run from syncContext
1185
  private void maybeTerminateChannel() {
1186
    if (terminated) {
1✔
1187
      return;
×
1188
    }
1189
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1190
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1191
      channelz.removeRootChannel(this);
1✔
1192
      executorPool.returnObject(executor);
1✔
1193
      balancerRpcExecutorHolder.release();
1✔
1194
      offloadExecutorHolder.release();
1✔
1195
      // Release the transport factory so that it can deallocate any resources.
1196
      transportFactory.close();
1✔
1197

1198
      terminated = true;
1✔
1199
      terminatedLatch.countDown();
1✔
1200
    }
1201
  }
1✔
1202

1203
  // Must be called from syncContext
1204
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1205
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1206
      refreshNameResolution();
1✔
1207
    }
1208
  }
1✔
1209

1210
  @Override
1211
  public ConnectivityState getState(boolean requestConnection) {
1212
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1213
    if (requestConnection && savedChannelState == IDLE) {
1✔
1214
      final class RequestConnection implements Runnable {
1✔
1215
        @Override
1216
        public void run() {
1217
          exitIdleMode();
1✔
1218
          if (lbHelper != null) {
1✔
1219
            lbHelper.lb.requestConnection();
1✔
1220
          }
1221
        }
1✔
1222
      }
1223

1224
      syncContext.execute(new RequestConnection());
1✔
1225
    }
1226
    return savedChannelState;
1✔
1227
  }
1228

1229
  @Override
1230
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1231
    final class NotifyStateChanged implements Runnable {
1✔
1232
      @Override
1233
      public void run() {
1234
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1235
      }
1✔
1236
    }
1237

1238
    syncContext.execute(new NotifyStateChanged());
1✔
1239
  }
1✔
1240

1241
  @Override
1242
  public void resetConnectBackoff() {
1243
    final class ResetConnectBackoff implements Runnable {
1✔
1244
      @Override
1245
      public void run() {
1246
        if (shutdown.get()) {
1✔
1247
          return;
1✔
1248
        }
1249
        if (nameResolverStarted) {
1✔
1250
          refreshNameResolution();
1✔
1251
        }
1252
        for (InternalSubchannel subchannel : subchannels) {
1✔
1253
          subchannel.resetConnectBackoff();
1✔
1254
        }
1✔
1255
        for (OobChannel oobChannel : oobChannels) {
1✔
1256
          oobChannel.resetConnectBackoff();
×
1257
        }
×
1258
      }
1✔
1259
    }
1260

1261
    syncContext.execute(new ResetConnectBackoff());
1✔
1262
  }
1✔
1263

1264
  @Override
1265
  public void enterIdle() {
1266
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1267
      @Override
1268
      public void run() {
1269
        if (shutdown.get() || lbHelper == null) {
1✔
1270
          return;
1✔
1271
        }
1272
        cancelIdleTimer(/* permanent= */ false);
1✔
1273
        enterIdleMode();
1✔
1274
      }
1✔
1275
    }
1276

1277
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1278
  }
1✔
1279

1280
  /**
1281
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1282
   * backoff.
1283
   */
1284
  private final class UncommittedRetriableStreamsRegistry {
1✔
1285
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1286
    // it's worthwhile to look for a lock-free approach.
1287
    final Object lock = new Object();
1✔
1288

1289
    @GuardedBy("lock")
1✔
1290
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1291

1292
    @GuardedBy("lock")
1293
    Status shutdownStatus;
1294

1295
    void onShutdown(Status reason) {
1296
      boolean shouldShutdownDelayedTransport = false;
1✔
1297
      synchronized (lock) {
1✔
1298
        if (shutdownStatus != null) {
1✔
1299
          return;
1✔
1300
        }
1301
        shutdownStatus = reason;
1✔
1302
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1303
        // retriable streams, which may be in backoff and not using any transport, are already
1304
        // started RPCs.
1305
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1306
          shouldShutdownDelayedTransport = true;
1✔
1307
        }
1308
      }
1✔
1309

1310
      if (shouldShutdownDelayedTransport) {
1✔
1311
        delayedTransport.shutdown(reason);
1✔
1312
      }
1313
    }
1✔
1314

1315
    void onShutdownNow(Status reason) {
1316
      onShutdown(reason);
1✔
1317
      Collection<ClientStream> streams;
1318

1319
      synchronized (lock) {
1✔
1320
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1321
      }
1✔
1322

1323
      for (ClientStream stream : streams) {
1✔
1324
        stream.cancel(reason);
1✔
1325
      }
1✔
1326
      delayedTransport.shutdownNow(reason);
1✔
1327
    }
1✔
1328

1329
    /**
1330
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1331
     * shutdown Status.
1332
     */
1333
    @Nullable
1334
    Status add(RetriableStream<?> retriableStream) {
1335
      synchronized (lock) {
1✔
1336
        if (shutdownStatus != null) {
1✔
1337
          return shutdownStatus;
1✔
1338
        }
1339
        uncommittedRetriableStreams.add(retriableStream);
1✔
1340
        return null;
1✔
1341
      }
1342
    }
1343

1344
    void remove(RetriableStream<?> retriableStream) {
1345
      Status shutdownStatusCopy = null;
1✔
1346

1347
      synchronized (lock) {
1✔
1348
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1349
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1350
          shutdownStatusCopy = shutdownStatus;
1✔
1351
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1352
          // hashmap.
1353
          uncommittedRetriableStreams = new HashSet<>();
1✔
1354
        }
1355
      }
1✔
1356

1357
      if (shutdownStatusCopy != null) {
1✔
1358
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1359
      }
1360
    }
1✔
1361
  }
1362

1363
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1364
    AutoConfiguredLoadBalancer lb;
1365

1366
    @Override
1367
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1368
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1369
      // No new subchannel should be created after load balancer has been shutdown.
1370
      checkState(!terminating, "Channel is being terminated");
1✔
1371
      return new SubchannelImpl(args);
1✔
1372
    }
1373

1374
    @Override
1375
    public void updateBalancingState(
1376
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1377
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1378
      checkNotNull(newState, "newState");
1✔
1379
      checkNotNull(newPicker, "newPicker");
1✔
1380

1381
      if (LbHelperImpl.this != lbHelper || panicMode) {
1✔
1382
        return;
1✔
1383
      }
1384
      updateSubchannelPicker(newPicker);
1✔
1385
      // It's not appropriate to report SHUTDOWN state from lb.
1386
      // Ignore the case of newState == SHUTDOWN for now.
1387
      if (newState != SHUTDOWN) {
1✔
1388
        channelLogger.log(
1✔
1389
            ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1390
        channelStateManager.gotoState(newState);
1✔
1391
      }
1392
    }
1✔
1393

1394
    @Override
1395
    public void refreshNameResolution() {
1396
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1397
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1398
        @Override
1399
        public void run() {
1400
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1401
        }
1✔
1402
      }
1403

1404
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1405
    }
1✔
1406

1407
    @Override
1408
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1409
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1410
    }
1411

1412
    @Override
1413
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1414
        String authority) {
1415
      // TODO(ejona): can we be even stricter? Like terminating?
1416
      checkState(!terminated, "Channel is terminated");
1✔
1417
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1418
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1419
      InternalLogId subchannelLogId =
1✔
1420
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1421
      ChannelTracer oobChannelTracer =
1✔
1422
          new ChannelTracer(
1423
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1424
              "OobChannel for " + addressGroup);
1425
      final OobChannel oobChannel = new OobChannel(
1✔
1426
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1427
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1428
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1429
          .setDescription("Child OobChannel created")
1✔
1430
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1431
          .setTimestampNanos(oobChannelCreationTime)
1✔
1432
          .setChannelRef(oobChannel)
1✔
1433
          .build());
1✔
1434
      ChannelTracer subchannelTracer =
1✔
1435
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1436
              "Subchannel for " + addressGroup);
1437
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1438
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1439
        @Override
1440
        void onTerminated(InternalSubchannel is) {
1441
          oobChannels.remove(oobChannel);
1✔
1442
          channelz.removeSubchannel(is);
1✔
1443
          oobChannel.handleSubchannelTerminated();
1✔
1444
          maybeTerminateChannel();
1✔
1445
        }
1✔
1446

1447
        @Override
1448
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1449
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1450
          //  state and refresh name resolution if necessary.
1451
          handleInternalSubchannelState(newState);
1✔
1452
          oobChannel.handleSubchannelStateChange(newState);
1✔
1453
        }
1✔
1454
      }
1455

1456
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1457
          CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
1✔
1458
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1459
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1460
          // All callback methods are run from syncContext
1461
          new ManagedOobChannelCallback(),
1462
          channelz,
1✔
1463
          callTracerFactory.create(),
1✔
1464
          subchannelTracer,
1465
          subchannelLogId,
1466
          subchannelLogger,
1467
          transportFilters);
1✔
1468
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1469
          .setDescription("Child Subchannel created")
1✔
1470
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1471
          .setTimestampNanos(oobChannelCreationTime)
1✔
1472
          .setSubchannelRef(internalSubchannel)
1✔
1473
          .build());
1✔
1474
      channelz.addSubchannel(oobChannel);
1✔
1475
      channelz.addSubchannel(internalSubchannel);
1✔
1476
      oobChannel.setSubchannel(internalSubchannel);
1✔
1477
      final class AddOobChannel implements Runnable {
1✔
1478
        @Override
1479
        public void run() {
1480
          if (terminating) {
1✔
1481
            oobChannel.shutdown();
×
1482
          }
1483
          if (!terminated) {
1✔
1484
            // If channel has not terminated, it will track the subchannel and block termination
1485
            // for it.
1486
            oobChannels.add(oobChannel);
1✔
1487
          }
1488
        }
1✔
1489
      }
1490

1491
      syncContext.execute(new AddOobChannel());
1✔
1492
      return oobChannel;
1✔
1493
    }
1494

1495
    @Deprecated
1496
    @Override
1497
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1498
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1499
          // Override authority to keep the old behavior.
1500
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1501
          .overrideAuthority(getAuthority());
1✔
1502
    }
1503

1504
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1505
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1506
    @Override
1507
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1508
        final String target, final ChannelCredentials channelCreds) {
1509
      checkNotNull(channelCreds, "channelCreds");
1✔
1510

1511
      final class ResolvingOobChannelBuilder
1512
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1513
        final ManagedChannelBuilder<?> delegate;
1514

1515
        ResolvingOobChannelBuilder() {
1✔
1516
          final ClientTransportFactory transportFactory;
1517
          CallCredentials callCredentials;
1518
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1519
            transportFactory = originalTransportFactory;
1✔
1520
            callCredentials = null;
1✔
1521
          } else {
1522
            SwapChannelCredentialsResult swapResult =
1✔
1523
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1524
            if (swapResult == null) {
1✔
1525
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1526
              return;
×
1527
            } else {
1528
              transportFactory = swapResult.transportFactory;
1✔
1529
              callCredentials = swapResult.callCredentials;
1✔
1530
            }
1531
          }
1532
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1533
              new ClientTransportFactoryBuilder() {
1✔
1534
                @Override
1535
                public ClientTransportFactory buildClientTransportFactory() {
1536
                  return transportFactory;
1✔
1537
                }
1538
              };
1539
          delegate = new ManagedChannelImplBuilder(
1✔
1540
              target,
1541
              channelCreds,
1542
              callCredentials,
1543
              transportFactoryBuilder,
1544
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1545
              .nameResolverRegistry(nameResolverRegistry);
1✔
1546
        }
1✔
1547

1548
        @Override
1549
        protected ManagedChannelBuilder<?> delegate() {
1550
          return delegate;
1✔
1551
        }
1552
      }
1553

1554
      checkState(!terminated, "Channel is terminated");
1✔
1555

1556
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1557

1558
      return builder
1✔
1559
          // TODO(zdapeng): executors should not outlive the parent channel.
1560
          .executor(executor)
1✔
1561
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1562
          .maxTraceEvents(maxTraceEvents)
1✔
1563
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1564
          .userAgent(userAgent);
1✔
1565
    }
1566

1567
    @Override
1568
    public ChannelCredentials getUnsafeChannelCredentials() {
1569
      if (originalChannelCreds == null) {
1✔
1570
        return new DefaultChannelCreds();
1✔
1571
      }
1572
      return originalChannelCreds;
×
1573
    }
1574

1575
    @Override
1576
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1577
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1578
    }
×
1579

1580
    @Override
1581
    public void updateOobChannelAddresses(ManagedChannel channel,
1582
        List<EquivalentAddressGroup> eag) {
1583
      checkArgument(channel instanceof OobChannel,
1✔
1584
          "channel must have been returned from createOobChannel");
1585
      ((OobChannel) channel).updateAddresses(eag);
1✔
1586
    }
1✔
1587

1588
    @Override
1589
    public String getAuthority() {
1590
      return ManagedChannelImpl.this.authority();
1✔
1591
    }
1592

1593
    @Override
1594
    public String getChannelTarget() {
1595
      return targetUri.toString();
1✔
1596
    }
1597

1598
    @Override
1599
    public SynchronizationContext getSynchronizationContext() {
1600
      return syncContext;
1✔
1601
    }
1602

1603
    @Override
1604
    public ScheduledExecutorService getScheduledExecutorService() {
1605
      return scheduledExecutor;
1✔
1606
    }
1607

1608
    @Override
1609
    public ChannelLogger getChannelLogger() {
1610
      return channelLogger;
1✔
1611
    }
1612

1613
    @Override
1614
    public NameResolver.Args getNameResolverArgs() {
1615
      return nameResolverArgs;
1✔
1616
    }
1617

1618
    @Override
1619
    public NameResolverRegistry getNameResolverRegistry() {
1620
      return nameResolverRegistry;
1✔
1621
    }
1622

1623
    @Override
1624
    public MetricRecorder getMetricRecorder() {
1625
      return metricRecorder;
1✔
1626
    }
1627

1628
    /**
1629
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1630
     */
1631
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1632
    //     channel creds.
1633
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1634
      @Override
1635
      public ChannelCredentials withoutBearerTokens() {
1636
        return this;
×
1637
      }
1638
    }
1639
  }
1640

1641
  final class NameResolverListener extends NameResolver.Listener2 {
1642
    final LbHelperImpl helper;
1643
    final NameResolver resolver;
1644

1645
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1646
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1647
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1648
    }
1✔
1649

1650
    @Override
1651
    public void onResult(final ResolutionResult resolutionResult) {
1652
      syncContext.execute(() -> onResult2(resolutionResult));
×
1653
    }
×
1654

1655
    @SuppressWarnings("ReferenceEquality")
1656
    @Override
1657
    public Status onResult2(final ResolutionResult resolutionResult) {
1658
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1659
      if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1660
        return Status.OK;
1✔
1661
      }
1662

1663
      StatusOr<List<EquivalentAddressGroup>> serversOrError =
1✔
1664
          resolutionResult.getAddressesOrError();
1✔
1665
      if (!serversOrError.hasValue()) {
1✔
1666
        handleErrorInSyncContext(serversOrError.getStatus());
1✔
1667
        return serversOrError.getStatus();
1✔
1668
      }
1669
      List<EquivalentAddressGroup> servers = serversOrError.getValue();
1✔
1670
      channelLogger.log(
1✔
1671
          ChannelLogLevel.DEBUG,
1672
          "Resolved address: {0}, config={1}",
1673
          servers,
1674
          resolutionResult.getAttributes());
1✔
1675

1676
      if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1677
        channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}",
1✔
1678
            servers);
1679
        lastResolutionState = ResolutionState.SUCCESS;
1✔
1680
      }
1681
      ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1682
      InternalConfigSelector resolvedConfigSelector =
1✔
1683
          resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1684
      ManagedChannelServiceConfig validServiceConfig =
1685
          configOrError != null && configOrError.getConfig() != null
1✔
1686
              ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1687
              : null;
1✔
1688
      Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1689

1690
      ManagedChannelServiceConfig effectiveServiceConfig;
1691
      if (!lookUpServiceConfig) {
1✔
1692
        if (validServiceConfig != null) {
1✔
1693
          channelLogger.log(
1✔
1694
              ChannelLogLevel.INFO,
1695
              "Service config from name resolver discarded by channel settings");
1696
        }
1697
        effectiveServiceConfig =
1698
            defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1699
        if (resolvedConfigSelector != null) {
1✔
1700
          channelLogger.log(
1✔
1701
              ChannelLogLevel.INFO,
1702
              "Config selector from name resolver discarded by channel settings");
1703
        }
1704
        realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1705
      } else {
1706
        // Try to use config if returned from name resolver
1707
        // Otherwise, try to use the default config if available
1708
        if (validServiceConfig != null) {
1✔
1709
          effectiveServiceConfig = validServiceConfig;
1✔
1710
          if (resolvedConfigSelector != null) {
1✔
1711
            realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1712
            if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1713
              channelLogger.log(
×
1714
                  ChannelLogLevel.DEBUG,
1715
                  "Method configs in service config will be discarded due to presence of"
1716
                      + "config-selector");
1717
            }
1718
          } else {
1719
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1720
          }
1721
        } else if (defaultServiceConfig != null) {
1✔
1722
          effectiveServiceConfig = defaultServiceConfig;
1✔
1723
          realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1724
          channelLogger.log(
1✔
1725
              ChannelLogLevel.INFO,
1726
              "Received no service config, using default service config");
1727
        } else if (serviceConfigError != null) {
1✔
1728
          if (!serviceConfigUpdated) {
1✔
1729
            // First DNS lookup has invalid service config, and cannot fall back to default
1730
            channelLogger.log(
1✔
1731
                ChannelLogLevel.INFO,
1732
                "Fallback to error due to invalid first service config without default config");
1733
            // This error could be an "inappropriate" control plane error that should not bleed
1734
            // through to client code using gRPC. We let them flow through here to the LB as
1735
            // we later check for these error codes when investigating pick results in
1736
            // GrpcUtil.getTransportFromPickResult().
1737
            onError(configOrError.getError());
1✔
1738
            return configOrError.getError();
1✔
1739
          } else {
1740
            effectiveServiceConfig = lastServiceConfig;
1✔
1741
          }
1742
        } else {
1743
          effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1744
          realChannel.updateConfigSelector(null);
1✔
1745
        }
1746
        if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1747
          channelLogger.log(
1✔
1748
              ChannelLogLevel.INFO,
1749
              "Service config changed{0}",
1750
              effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1751
          lastServiceConfig = effectiveServiceConfig;
1✔
1752
          transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1753
        }
1754

1755
        try {
1756
          // TODO(creamsoup): when `serversOrError` is empty and lastResolutionStateCopy == SUCCESS
1757
          //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1758
          //  lbNeedAddress is not deterministic
1759
          serviceConfigUpdated = true;
1✔
1760
        } catch (RuntimeException re) {
×
1761
          logger.log(
×
1762
              Level.WARNING,
1763
              "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1764
              re);
1765
        }
1✔
1766
      }
1767

1768
      Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1769
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1770
      if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1771
        Attributes.Builder attrBuilder =
1✔
1772
            effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1773
        Map<String, ?> healthCheckingConfig =
1✔
1774
            effectiveServiceConfig.getHealthCheckingConfig();
1✔
1775
        if (healthCheckingConfig != null) {
1✔
1776
          attrBuilder
1✔
1777
              .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1778
              .build();
1✔
1779
        }
1780
        Attributes attributes = attrBuilder.build();
1✔
1781

1782
        ResolvedAddresses.Builder resolvedAddresses = ResolvedAddresses.newBuilder()
1✔
1783
            .setAddresses(serversOrError.getValue())
1✔
1784
            .setAttributes(attributes)
1✔
1785
            .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig());
1✔
1786
        Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1787
            resolvedAddresses.build());
1✔
1788
        return addressAcceptanceStatus;
1✔
1789
      }
1790
      return Status.OK;
×
1791
    }
1792

1793
    @Override
1794
    public void onError(final Status error) {
1795
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1796
      final class NameResolverErrorHandler implements Runnable {
1✔
1797
        @Override
1798
        public void run() {
1799
          handleErrorInSyncContext(error);
1✔
1800
        }
1✔
1801
      }
1802

1803
      syncContext.execute(new NameResolverErrorHandler());
1✔
1804
    }
1✔
1805

1806
    private void handleErrorInSyncContext(Status error) {
1807
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1808
          new Object[] {getLogId(), error});
1✔
1809
      realChannel.onConfigError();
1✔
1810
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1811
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1812
        lastResolutionState = ResolutionState.ERROR;
1✔
1813
      }
1814
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1815
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1816
        return;
1✔
1817
      }
1818

1819
      helper.lb.handleNameResolutionError(error);
1✔
1820
    }
1✔
1821
  }
1822

1823
  private final class SubchannelImpl extends AbstractSubchannel {
1824
    final CreateSubchannelArgs args;
1825
    final InternalLogId subchannelLogId;
1826
    final ChannelLoggerImpl subchannelLogger;
1827
    final ChannelTracer subchannelTracer;
1828
    List<EquivalentAddressGroup> addressGroups;
1829
    InternalSubchannel subchannel;
1830
    boolean started;
1831
    boolean shutdown;
1832
    ScheduledHandle delayedShutdownTask;
1833

1834
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1835
      checkNotNull(args, "args");
1✔
1836
      addressGroups = args.getAddresses();
1✔
1837
      if (authorityOverride != null) {
1✔
1838
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1839
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1840
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1841
      }
1842
      this.args = args;
1✔
1843
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1844
      subchannelTracer = new ChannelTracer(
1✔
1845
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1846
          "Subchannel for " + args.getAddresses());
1✔
1847
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1848
    }
1✔
1849

1850
    @Override
1851
    public void start(final SubchannelStateListener listener) {
1852
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1853
      checkState(!started, "already started");
1✔
1854
      checkState(!shutdown, "already shutdown");
1✔
1855
      checkState(!terminating, "Channel is being terminated");
1✔
1856
      started = true;
1✔
1857
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1858
        // All callbacks are run in syncContext
1859
        @Override
1860
        void onTerminated(InternalSubchannel is) {
1861
          subchannels.remove(is);
1✔
1862
          channelz.removeSubchannel(is);
1✔
1863
          maybeTerminateChannel();
1✔
1864
        }
1✔
1865

1866
        @Override
1867
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1868
          checkState(listener != null, "listener is null");
1✔
1869
          listener.onSubchannelState(newState);
1✔
1870
        }
1✔
1871

1872
        @Override
1873
        void onInUse(InternalSubchannel is) {
1874
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1875
        }
1✔
1876

1877
        @Override
1878
        void onNotInUse(InternalSubchannel is) {
1879
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1880
        }
1✔
1881
      }
1882

1883
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1884
          args,
1885
          authority(),
1✔
1886
          userAgent,
1✔
1887
          backoffPolicyProvider,
1✔
1888
          transportFactory,
1✔
1889
          transportFactory.getScheduledExecutorService(),
1✔
1890
          stopwatchSupplier,
1✔
1891
          syncContext,
1892
          new ManagedInternalSubchannelCallback(),
1893
          channelz,
1✔
1894
          callTracerFactory.create(),
1✔
1895
          subchannelTracer,
1896
          subchannelLogId,
1897
          subchannelLogger,
1898
          transportFilters);
1✔
1899

1900
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1901
          .setDescription("Child Subchannel started")
1✔
1902
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1903
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1904
          .setSubchannelRef(internalSubchannel)
1✔
1905
          .build());
1✔
1906

1907
      this.subchannel = internalSubchannel;
1✔
1908
      channelz.addSubchannel(internalSubchannel);
1✔
1909
      subchannels.add(internalSubchannel);
1✔
1910
    }
1✔
1911

1912
    @Override
1913
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1914
      checkState(started, "not started");
1✔
1915
      return subchannel;
1✔
1916
    }
1917

1918
    @Override
1919
    public void shutdown() {
1920
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1921
      if (subchannel == null) {
1✔
1922
        // start() was not successful
1923
        shutdown = true;
×
1924
        return;
×
1925
      }
1926
      if (shutdown) {
1✔
1927
        if (terminating && delayedShutdownTask != null) {
1✔
1928
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1929
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1930
          delayedShutdownTask.cancel();
×
1931
          delayedShutdownTask = null;
×
1932
          // Will fall through to the subchannel.shutdown() at the end.
1933
        } else {
1934
          return;
1✔
1935
        }
1936
      } else {
1937
        shutdown = true;
1✔
1938
      }
1939
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1940
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1941
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1942
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1943
      // shutdown of Subchannel for a few seconds here.
1944
      //
1945
      // TODO(zhangkun83): consider a better approach
1946
      // (https://github.com/grpc/grpc-java/issues/2562).
1947
      if (!terminating) {
1✔
1948
        final class ShutdownSubchannel implements Runnable {
1✔
1949
          @Override
1950
          public void run() {
1951
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
1952
          }
1✔
1953
        }
1954

1955
        delayedShutdownTask = syncContext.schedule(
1✔
1956
            new LogExceptionRunnable(new ShutdownSubchannel()),
1957
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
1958
            transportFactory.getScheduledExecutorService());
1✔
1959
        return;
1✔
1960
      }
1961
      // When terminating == true, no more real streams will be created. It's safe and also
1962
      // desirable to shutdown timely.
1963
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
1964
    }
1✔
1965

1966
    @Override
1967
    public void requestConnection() {
1968
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1969
      checkState(started, "not started");
1✔
1970
      subchannel.obtainActiveTransport();
1✔
1971
    }
1✔
1972

1973
    @Override
1974
    public List<EquivalentAddressGroup> getAllAddresses() {
1975
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1976
      checkState(started, "not started");
1✔
1977
      return addressGroups;
1✔
1978
    }
1979

1980
    @Override
1981
    public Attributes getAttributes() {
1982
      return args.getAttributes();
1✔
1983
    }
1984

1985
    @Override
1986
    public String toString() {
1987
      return subchannelLogId.toString();
1✔
1988
    }
1989

1990
    @Override
1991
    public Channel asChannel() {
1992
      checkState(started, "not started");
1✔
1993
      return new SubchannelChannel(
1✔
1994
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
1995
          transportFactory.getScheduledExecutorService(),
1✔
1996
          callTracerFactory.create(),
1✔
1997
          new AtomicReference<InternalConfigSelector>(null));
1998
    }
1999

2000
    @Override
2001
    public Object getInternalSubchannel() {
2002
      checkState(started, "Subchannel is not started");
1✔
2003
      return subchannel;
1✔
2004
    }
2005

2006
    @Override
2007
    public ChannelLogger getChannelLogger() {
2008
      return subchannelLogger;
1✔
2009
    }
2010

2011
    @Override
2012
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2013
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2014
      addressGroups = addrs;
1✔
2015
      if (authorityOverride != null) {
1✔
2016
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2017
      }
2018
      subchannel.updateAddresses(addrs);
1✔
2019
    }
1✔
2020

2021
    @Override
2022
    public Attributes getConnectedAddressAttributes() {
2023
      return subchannel.getConnectedAddressAttributes();
1✔
2024
    }
2025

2026
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2027
        List<EquivalentAddressGroup> eags) {
2028
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2029
      for (EquivalentAddressGroup eag : eags) {
1✔
2030
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2031
            eag.getAddresses(),
1✔
2032
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2033
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2034
      }
1✔
2035
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2036
    }
2037
  }
2038

2039
  @Override
2040
  public String toString() {
2041
    return MoreObjects.toStringHelper(this)
1✔
2042
        .add("logId", logId.getId())
1✔
2043
        .add("target", target)
1✔
2044
        .toString();
1✔
2045
  }
2046

2047
  /**
2048
   * Called from syncContext.
2049
   */
2050
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2051
    @Override
2052
    public void transportShutdown(Status s) {
2053
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2054
    }
1✔
2055

2056
    @Override
2057
    public void transportReady() {
2058
      // Don't care
2059
    }
×
2060

2061
    @Override
2062
    public Attributes filterTransport(Attributes attributes) {
2063
      return attributes;
×
2064
    }
2065

2066
    @Override
2067
    public void transportInUse(final boolean inUse) {
2068
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2069
      if (inUse) {
1✔
2070
        // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
2071
        // subchannels is in use. But we should never be in idle mode when delayed transport is in
2072
        // use.
2073
        exitIdleMode();
1✔
2074
      }
2075
    }
1✔
2076

2077
    @Override
2078
    public void transportTerminated() {
2079
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2080
      terminating = true;
1✔
2081
      shutdownNameResolverAndLoadBalancer(false);
1✔
2082
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2083
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2084
      // here.
2085
      maybeShutdownNowSubchannels();
1✔
2086
      maybeTerminateChannel();
1✔
2087
    }
1✔
2088
  }
2089

2090
  /**
2091
   * Must be accessed from syncContext.
2092
   */
2093
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2094
    @Override
2095
    protected void handleInUse() {
2096
      exitIdleMode();
1✔
2097
    }
1✔
2098

2099
    @Override
2100
    protected void handleNotInUse() {
2101
      if (shutdown.get()) {
1✔
2102
        return;
1✔
2103
      }
2104
      rescheduleIdleTimer();
1✔
2105
    }
1✔
2106
  }
2107

2108
  /**
2109
   * Lazily request for Executor from an executor pool.
2110
   * Also act as an Executor directly to simply run a cmd
2111
   */
2112
  @VisibleForTesting
2113
  static final class ExecutorHolder implements Executor {
2114
    private final ObjectPool<? extends Executor> pool;
2115
    private Executor executor;
2116

2117
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2118
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2119
    }
1✔
2120

2121
    synchronized Executor getExecutor() {
2122
      if (executor == null) {
1✔
2123
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2124
      }
2125
      return executor;
1✔
2126
    }
2127

2128
    synchronized void release() {
2129
      if (executor != null) {
1✔
2130
        executor = pool.returnObject(executor);
1✔
2131
      }
2132
    }
1✔
2133

2134
    @Override
2135
    public void execute(Runnable command) {
2136
      getExecutor().execute(command);
1✔
2137
    }
1✔
2138
  }
2139

2140
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2141
    final ScheduledExecutorService delegate;
2142

2143
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2144
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2145
    }
1✔
2146

2147
    @Override
2148
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2149
      return delegate.schedule(callable, delay, unit);
×
2150
    }
2151

2152
    @Override
2153
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2154
      return delegate.schedule(cmd, delay, unit);
1✔
2155
    }
2156

2157
    @Override
2158
    public ScheduledFuture<?> scheduleAtFixedRate(
2159
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2160
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
1✔
2161
    }
2162

2163
    @Override
2164
    public ScheduledFuture<?> scheduleWithFixedDelay(
2165
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2166
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2167
    }
2168

2169
    @Override
2170
    public boolean awaitTermination(long timeout, TimeUnit unit)
2171
        throws InterruptedException {
2172
      return delegate.awaitTermination(timeout, unit);
×
2173
    }
2174

2175
    @Override
2176
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2177
        throws InterruptedException {
2178
      return delegate.invokeAll(tasks);
×
2179
    }
2180

2181
    @Override
2182
    public <T> List<Future<T>> invokeAll(
2183
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2184
        throws InterruptedException {
2185
      return delegate.invokeAll(tasks, timeout, unit);
×
2186
    }
2187

2188
    @Override
2189
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2190
        throws InterruptedException, ExecutionException {
2191
      return delegate.invokeAny(tasks);
×
2192
    }
2193

2194
    @Override
2195
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2196
        throws InterruptedException, ExecutionException, TimeoutException {
2197
      return delegate.invokeAny(tasks, timeout, unit);
×
2198
    }
2199

2200
    @Override
2201
    public boolean isShutdown() {
2202
      return delegate.isShutdown();
×
2203
    }
2204

2205
    @Override
2206
    public boolean isTerminated() {
2207
      return delegate.isTerminated();
×
2208
    }
2209

2210
    @Override
2211
    public void shutdown() {
2212
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2213
    }
2214

2215
    @Override
2216
    public List<Runnable> shutdownNow() {
2217
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2218
    }
2219

2220
    @Override
2221
    public <T> Future<T> submit(Callable<T> task) {
2222
      return delegate.submit(task);
×
2223
    }
2224

2225
    @Override
2226
    public Future<?> submit(Runnable task) {
2227
      return delegate.submit(task);
×
2228
    }
2229

2230
    @Override
2231
    public <T> Future<T> submit(Runnable task, T result) {
2232
      return delegate.submit(task, result);
×
2233
    }
2234

2235
    @Override
2236
    public void execute(Runnable command) {
2237
      delegate.execute(command);
×
2238
    }
×
2239
  }
2240

2241
  /**
2242
   * A ResolutionState indicates the status of last name resolution.
2243
   */
2244
  enum ResolutionState {
1✔
2245
    NO_RESOLUTION,
1✔
2246
    SUCCESS,
1✔
2247
    ERROR
1✔
2248
  }
2249
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc