• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19749

24 Mar 2025 09:32PM UTC coverage: 88.579% (+0.001%) from 88.578%
#19749

push

github

web-flow
core: Log any exception during panic because of exception

panic() calls a good amount of code, so it could get another exception.
The SynchronizationContext is running on an arbitrary thread and we
don't want to propagate this secondary exception up its stack (to be
handled by its UncaughtExceptionHandler); it we wanted that we'd
propagate the original exception.

This second exception will only be seen in the logs; the first exception
was logged and will be used to fail RPCs.

Also related to http://yaqs/8493785598685872128 and b692b9d26

34608 of 39070 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.71
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import com.google.errorprone.annotations.concurrent.GuardedBy;
36
import io.grpc.Attributes;
37
import io.grpc.CallCredentials;
38
import io.grpc.CallOptions;
39
import io.grpc.Channel;
40
import io.grpc.ChannelCredentials;
41
import io.grpc.ChannelLogger;
42
import io.grpc.ChannelLogger.ChannelLogLevel;
43
import io.grpc.ClientCall;
44
import io.grpc.ClientInterceptor;
45
import io.grpc.ClientInterceptors;
46
import io.grpc.ClientStreamTracer;
47
import io.grpc.ClientTransportFilter;
48
import io.grpc.CompressorRegistry;
49
import io.grpc.ConnectivityState;
50
import io.grpc.ConnectivityStateInfo;
51
import io.grpc.Context;
52
import io.grpc.Deadline;
53
import io.grpc.DecompressorRegistry;
54
import io.grpc.EquivalentAddressGroup;
55
import io.grpc.ForwardingChannelBuilder2;
56
import io.grpc.ForwardingClientCall;
57
import io.grpc.Grpc;
58
import io.grpc.InternalChannelz;
59
import io.grpc.InternalChannelz.ChannelStats;
60
import io.grpc.InternalChannelz.ChannelTrace;
61
import io.grpc.InternalConfigSelector;
62
import io.grpc.InternalInstrumented;
63
import io.grpc.InternalLogId;
64
import io.grpc.InternalWithLogId;
65
import io.grpc.LoadBalancer;
66
import io.grpc.LoadBalancer.CreateSubchannelArgs;
67
import io.grpc.LoadBalancer.PickResult;
68
import io.grpc.LoadBalancer.PickSubchannelArgs;
69
import io.grpc.LoadBalancer.ResolvedAddresses;
70
import io.grpc.LoadBalancer.SubchannelPicker;
71
import io.grpc.LoadBalancer.SubchannelStateListener;
72
import io.grpc.ManagedChannel;
73
import io.grpc.ManagedChannelBuilder;
74
import io.grpc.Metadata;
75
import io.grpc.MethodDescriptor;
76
import io.grpc.MetricInstrumentRegistry;
77
import io.grpc.MetricRecorder;
78
import io.grpc.NameResolver;
79
import io.grpc.NameResolver.ConfigOrError;
80
import io.grpc.NameResolver.ResolutionResult;
81
import io.grpc.NameResolverProvider;
82
import io.grpc.NameResolverRegistry;
83
import io.grpc.ProxyDetector;
84
import io.grpc.Status;
85
import io.grpc.StatusOr;
86
import io.grpc.SynchronizationContext;
87
import io.grpc.SynchronizationContext.ScheduledHandle;
88
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
89
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
90
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
91
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
92
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
93
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
94
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
95
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
96
import io.grpc.internal.RetriableStream.Throttle;
97
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
98
import java.net.URI;
99
import java.util.ArrayList;
100
import java.util.Collection;
101
import java.util.Collections;
102
import java.util.HashSet;
103
import java.util.LinkedHashSet;
104
import java.util.List;
105
import java.util.Map;
106
import java.util.Set;
107
import java.util.concurrent.Callable;
108
import java.util.concurrent.CountDownLatch;
109
import java.util.concurrent.ExecutionException;
110
import java.util.concurrent.Executor;
111
import java.util.concurrent.Future;
112
import java.util.concurrent.ScheduledExecutorService;
113
import java.util.concurrent.ScheduledFuture;
114
import java.util.concurrent.TimeUnit;
115
import java.util.concurrent.TimeoutException;
116
import java.util.concurrent.atomic.AtomicBoolean;
117
import java.util.concurrent.atomic.AtomicReference;
118
import java.util.logging.Level;
119
import java.util.logging.Logger;
120
import javax.annotation.Nullable;
121
import javax.annotation.concurrent.ThreadSafe;
122

123
/** A communication channel for making outgoing RPCs. */
124
@ThreadSafe
125
final class ManagedChannelImpl extends ManagedChannel implements
126
    InternalInstrumented<ChannelStats> {
127
  @VisibleForTesting
128
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
129

130
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
131

132
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
133

134
  @VisibleForTesting
135
  static final Status SHUTDOWN_NOW_STATUS =
1✔
136
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
137

138
  @VisibleForTesting
139
  static final Status SHUTDOWN_STATUS =
1✔
140
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
141

142
  @VisibleForTesting
143
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
144
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
145

146
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
147
      ManagedChannelServiceConfig.empty();
1✔
148
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
149
      new InternalConfigSelector() {
1✔
150
        @Override
151
        public Result selectConfig(PickSubchannelArgs args) {
152
          throw new IllegalStateException("Resolution is pending");
×
153
        }
154
      };
155
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
156
      new LoadBalancer.PickDetailsConsumer() {};
1✔
157

158
  private final InternalLogId logId;
159
  private final String target;
160
  @Nullable
161
  private final String authorityOverride;
162
  private final NameResolverRegistry nameResolverRegistry;
163
  private final URI targetUri;
164
  private final NameResolverProvider nameResolverProvider;
165
  private final NameResolver.Args nameResolverArgs;
166
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
167
  private final ClientTransportFactory originalTransportFactory;
168
  @Nullable
169
  private final ChannelCredentials originalChannelCreds;
170
  private final ClientTransportFactory transportFactory;
171
  private final ClientTransportFactory oobTransportFactory;
172
  private final RestrictedScheduledExecutor scheduledExecutor;
173
  private final Executor executor;
174
  private final ObjectPool<? extends Executor> executorPool;
175
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
176
  private final ExecutorHolder balancerRpcExecutorHolder;
177
  private final ExecutorHolder offloadExecutorHolder;
178
  private final TimeProvider timeProvider;
179
  private final int maxTraceEvents;
180

181
  @VisibleForTesting
1✔
182
  final SynchronizationContext syncContext = new SynchronizationContext(
183
      new Thread.UncaughtExceptionHandler() {
1✔
184
        @Override
185
        public void uncaughtException(Thread t, Throwable e) {
186
          logger.log(
1✔
187
              Level.SEVERE,
188
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
189
              e);
190
          try {
191
            panic(e);
1✔
192
          } catch (Throwable anotherT) {
×
193
            logger.log(
×
194
                Level.SEVERE, "[" + getLogId() + "] Uncaught exception while panicking", anotherT);
×
195
          }
1✔
196
        }
1✔
197
      });
198

199
  private boolean fullStreamDecompression;
200

201
  private final DecompressorRegistry decompressorRegistry;
202
  private final CompressorRegistry compressorRegistry;
203

204
  private final Supplier<Stopwatch> stopwatchSupplier;
205
  /** The timeout before entering idle mode. */
206
  private final long idleTimeoutMillis;
207

208
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
209
  private final BackoffPolicy.Provider backoffPolicyProvider;
210

211
  /**
212
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
213
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
214
   * {@link RealChannel}.
215
   */
216
  private final Channel interceptorChannel;
217

218
  private final List<ClientTransportFilter> transportFilters;
219
  @Nullable private final String userAgent;
220

221
  // Only null after channel is terminated. Must be assigned from the syncContext.
222
  private NameResolver nameResolver;
223

224
  // Must be accessed from the syncContext.
225
  private boolean nameResolverStarted;
226

227
  // null when channel is in idle mode.  Must be assigned from syncContext.
228
  @Nullable
229
  private LbHelperImpl lbHelper;
230

231
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
232
  // null if channel is in idle mode.
233
  @Nullable
234
  private volatile SubchannelPicker subchannelPicker;
235

236
  // Must be accessed from the syncContext
237
  private boolean panicMode;
238

239
  // Must be mutated from syncContext
240
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
241
  // switch to a ConcurrentHashMap.
242
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
243

244
  // Must be accessed from syncContext
245
  @Nullable
246
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
247
  private final Object pendingCallsInUseObject = new Object();
1✔
248

249
  // Must be mutated from syncContext
250
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
251

252
  // reprocess() must be run from syncContext
253
  private final DelayedClientTransport delayedTransport;
254
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
255
      = new UncommittedRetriableStreamsRegistry();
256

257
  // Shutdown states.
258
  //
259
  // Channel's shutdown process:
260
  // 1. shutdown(): stop accepting new calls from applications
261
  //   1a shutdown <- true
262
  //   1b subchannelPicker <- null
263
  //   1c delayedTransport.shutdown()
264
  // 2. delayedTransport terminated: stop stream-creation functionality
265
  //   2a terminating <- true
266
  //   2b loadBalancer.shutdown()
267
  //     * LoadBalancer will shutdown subchannels and OOB channels
268
  //   2c loadBalancer <- null
269
  //   2d nameResolver.shutdown()
270
  //   2e nameResolver <- null
271
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
272

273
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
274
  // Must only be mutated and read from syncContext
275
  private boolean shutdownNowed;
276
  // Must only be mutated from syncContext
277
  private boolean terminating;
278
  // Must be mutated from syncContext
279
  private volatile boolean terminated;
280
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
281

282
  private final CallTracer.Factory callTracerFactory;
283
  private final CallTracer channelCallTracer;
284
  private final ChannelTracer channelTracer;
285
  private final ChannelLogger channelLogger;
286
  private final InternalChannelz channelz;
287
  private final RealChannel realChannel;
288
  // Must be mutated and read from syncContext
289
  // a flag for doing channel tracing when flipped
290
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
291
  // Must be mutated and read from constructor or syncContext
292
  // used for channel tracing when value changed
293
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
294

295
  @Nullable
296
  private final ManagedChannelServiceConfig defaultServiceConfig;
297
  // Must be mutated and read from constructor or syncContext
298
  private boolean serviceConfigUpdated = false;
1✔
299
  private final boolean lookUpServiceConfig;
300

301
  // One instance per channel.
302
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
303

304
  private final long perRpcBufferLimit;
305
  private final long channelBufferLimit;
306

307
  // Temporary false flag that can skip the retry code path.
308
  private final boolean retryEnabled;
309

310
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
311

312
  // Called from syncContext
313
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
314
      new DelayedTransportListener();
315

316
  // Must be called from syncContext
317
  private void maybeShutdownNowSubchannels() {
318
    if (shutdownNowed) {
1✔
319
      for (InternalSubchannel subchannel : subchannels) {
1✔
320
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
321
      }
1✔
322
      for (OobChannel oobChannel : oobChannels) {
1✔
323
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
324
      }
1✔
325
    }
326
  }
1✔
327

328
  // Must be accessed from syncContext
329
  @VisibleForTesting
1✔
330
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
331

332
  @Override
333
  public ListenableFuture<ChannelStats> getStats() {
334
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
335
    final class StatsFetcher implements Runnable {
1✔
336
      @Override
337
      public void run() {
338
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
339
        channelCallTracer.updateBuilder(builder);
1✔
340
        channelTracer.updateBuilder(builder);
1✔
341
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
342
        List<InternalWithLogId> children = new ArrayList<>();
1✔
343
        children.addAll(subchannels);
1✔
344
        children.addAll(oobChannels);
1✔
345
        builder.setSubchannels(children);
1✔
346
        ret.set(builder.build());
1✔
347
      }
1✔
348
    }
349

350
    // subchannels and oobchannels can only be accessed from syncContext
351
    syncContext.execute(new StatsFetcher());
1✔
352
    return ret;
1✔
353
  }
354

355
  @Override
356
  public InternalLogId getLogId() {
357
    return logId;
1✔
358
  }
359

360
  // Run from syncContext
361
  private class IdleModeTimer implements Runnable {
1✔
362

363
    @Override
364
    public void run() {
365
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
366
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
367
      // subtle to change rapidly to resolve the channel panic. See #8714
368
      if (lbHelper == null) {
1✔
369
        return;
×
370
      }
371
      enterIdleMode();
1✔
372
    }
1✔
373
  }
374

375
  // Must be called from syncContext
376
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
377
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
378
    if (channelIsActive) {
1✔
379
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
380
      checkState(lbHelper != null, "lbHelper is null");
1✔
381
    }
382
    if (nameResolver != null) {
1✔
383
      nameResolver.shutdown();
1✔
384
      nameResolverStarted = false;
1✔
385
      if (channelIsActive) {
1✔
386
        nameResolver = getNameResolver(
1✔
387
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
388
      } else {
389
        nameResolver = null;
1✔
390
      }
391
    }
392
    if (lbHelper != null) {
1✔
393
      lbHelper.lb.shutdown();
1✔
394
      lbHelper = null;
1✔
395
    }
396
    subchannelPicker = null;
1✔
397
  }
1✔
398

399
  /**
400
   * Make the channel exit idle mode, if it's in it.
401
   *
402
   * <p>Must be called from syncContext
403
   */
404
  @VisibleForTesting
405
  void exitIdleMode() {
406
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
407
    if (shutdown.get() || panicMode) {
1✔
408
      return;
1✔
409
    }
410
    if (inUseStateAggregator.isInUse()) {
1✔
411
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
412
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
413
      cancelIdleTimer(false);
1✔
414
    } else {
415
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
416
      // isInUse() == false, in which case we still need to schedule the timer.
417
      rescheduleIdleTimer();
1✔
418
    }
419
    if (lbHelper != null) {
1✔
420
      return;
1✔
421
    }
422
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
423
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
424
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
425
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
426
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
427
    this.lbHelper = lbHelper;
1✔
428

429
    channelStateManager.gotoState(CONNECTING);
1✔
430
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
431
    nameResolver.start(listener);
1✔
432
    nameResolverStarted = true;
1✔
433
  }
1✔
434

435
  // Must be run from syncContext
436
  private void enterIdleMode() {
437
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
438
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
439
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
440
    // which are bugs.
441
    shutdownNameResolverAndLoadBalancer(true);
1✔
442
    delayedTransport.reprocess(null);
1✔
443
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
444
    channelStateManager.gotoState(IDLE);
1✔
445
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
446
    // transport to be holding some we need to exit idle mode to give these calls a chance to
447
    // be processed.
448
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
449
      exitIdleMode();
1✔
450
    }
451
  }
1✔
452

453
  // Must be run from syncContext
454
  private void cancelIdleTimer(boolean permanent) {
455
    idleTimer.cancel(permanent);
1✔
456
  }
1✔
457

458
  // Always run from syncContext
459
  private void rescheduleIdleTimer() {
460
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
461
      return;
1✔
462
    }
463
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
464
  }
1✔
465

466
  /**
467
   * Force name resolution refresh to happen immediately. Must be run
468
   * from syncContext.
469
   */
470
  private void refreshNameResolution() {
471
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
472
    if (nameResolverStarted) {
1✔
473
      nameResolver.refresh();
1✔
474
    }
475
  }
1✔
476

477
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
478
    volatile Throttle throttle;
479

480
    @Override
481
    public ClientStream newStream(
482
        final MethodDescriptor<?, ?> method,
483
        final CallOptions callOptions,
484
        final Metadata headers,
485
        final Context context) {
486
      // There is no need to reschedule the idle timer here. If the channel isn't shut down, either
487
      // the delayed transport or a real transport will go in-use and cancel the idle timer.
488
      if (!retryEnabled) {
1✔
489
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
490
            callOptions, headers, 0, /* isTransparentRetry= */ false);
491
        Context origContext = context.attach();
1✔
492
        try {
493
          return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
494
        } finally {
495
          context.detach(origContext);
1✔
496
        }
497
      } else {
498
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
499
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
500
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
501
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
502
          @SuppressWarnings("unchecked")
503
          RetryStream() {
1✔
504
            super(
1✔
505
                (MethodDescriptor<ReqT, ?>) method,
506
                headers,
507
                channelBufferUsed,
1✔
508
                perRpcBufferLimit,
1✔
509
                channelBufferLimit,
1✔
510
                getCallExecutor(callOptions),
1✔
511
                transportFactory.getScheduledExecutorService(),
1✔
512
                retryPolicy,
513
                hedgingPolicy,
514
                throttle);
515
          }
1✔
516

517
          @Override
518
          Status prestart() {
519
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
520
          }
521

522
          @Override
523
          void postCommit() {
524
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
525
          }
1✔
526

527
          @Override
528
          ClientStream newSubstream(
529
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
530
              boolean isTransparentRetry) {
531
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
532
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
533
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
534
            Context origContext = context.attach();
1✔
535
            try {
536
              return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
1✔
537
            } finally {
538
              context.detach(origContext);
1✔
539
            }
540
          }
541
        }
542

543
        return new RetryStream<>();
1✔
544
      }
545
    }
546
  }
547

548
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
549

550
  private final Rescheduler idleTimer;
551
  private final MetricRecorder metricRecorder;
552

553
  ManagedChannelImpl(
554
      ManagedChannelImplBuilder builder,
555
      ClientTransportFactory clientTransportFactory,
556
      URI targetUri,
557
      NameResolverProvider nameResolverProvider,
558
      BackoffPolicy.Provider backoffPolicyProvider,
559
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
560
      Supplier<Stopwatch> stopwatchSupplier,
561
      List<ClientInterceptor> interceptors,
562
      final TimeProvider timeProvider) {
1✔
563
    this.target = checkNotNull(builder.target, "target");
1✔
564
    this.logId = InternalLogId.allocate("Channel", target);
1✔
565
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
566
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
567
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
568
    this.originalChannelCreds = builder.channelCredentials;
1✔
569
    this.originalTransportFactory = clientTransportFactory;
1✔
570
    this.offloadExecutorHolder =
1✔
571
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
572
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
573
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
574
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
575
        clientTransportFactory, null, this.offloadExecutorHolder);
576
    this.scheduledExecutor =
1✔
577
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
578
    maxTraceEvents = builder.maxTraceEvents;
1✔
579
    channelTracer = new ChannelTracer(
1✔
580
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
581
        "Channel for '" + target + "'");
582
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
583
    ProxyDetector proxyDetector =
584
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
585
    this.retryEnabled = builder.retryEnabled;
1✔
586
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
587
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
588
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
589
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
590
    ScParser serviceConfigParser =
1✔
591
        new ScParser(
592
            retryEnabled,
593
            builder.maxRetryAttempts,
594
            builder.maxHedgedAttempts,
595
            loadBalancerFactory);
596
    this.authorityOverride = builder.authorityOverride;
1✔
597
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
598
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
599
    NameResolver.Args.Builder nameResolverArgsBuilder = NameResolver.Args.newBuilder()
1✔
600
            .setDefaultPort(builder.getDefaultPort())
1✔
601
            .setProxyDetector(proxyDetector)
1✔
602
            .setSynchronizationContext(syncContext)
1✔
603
            .setScheduledExecutorService(scheduledExecutor)
1✔
604
            .setServiceConfigParser(serviceConfigParser)
1✔
605
            .setChannelLogger(channelLogger)
1✔
606
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
607
            .setOverrideAuthority(this.authorityOverride)
1✔
608
            .setMetricRecorder(this.metricRecorder);
1✔
609
    builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
1✔
610
    this.nameResolverArgs = nameResolverArgsBuilder.build();
1✔
611
    this.nameResolver = getNameResolver(
1✔
612
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
613
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
614
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
615
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
616
    this.delayedTransport.start(delayedTransportListener);
1✔
617
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
618

619
    if (builder.defaultServiceConfig != null) {
1✔
620
      ConfigOrError parsedDefaultServiceConfig =
1✔
621
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
622
      checkState(
1✔
623
          parsedDefaultServiceConfig.getError() == null,
1✔
624
          "Default config is invalid: %s",
625
          parsedDefaultServiceConfig.getError());
1✔
626
      this.defaultServiceConfig =
1✔
627
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
628
      this.transportProvider.throttle = this.defaultServiceConfig.getRetryThrottling();
1✔
629
    } else {
1✔
630
      this.defaultServiceConfig = null;
1✔
631
    }
632
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
633
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
634
    Channel channel = realChannel;
1✔
635
    if (builder.binlog != null) {
1✔
636
      channel = builder.binlog.wrapChannel(channel);
1✔
637
    }
638
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
639
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
640
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
641
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
642
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
643
    } else {
644
      checkArgument(
1✔
645
          builder.idleTimeoutMillis
646
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
647
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
648
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
649
    }
650

651
    idleTimer = new Rescheduler(
1✔
652
        new IdleModeTimer(),
653
        syncContext,
654
        transportFactory.getScheduledExecutorService(),
1✔
655
        stopwatchSupplier.get());
1✔
656
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
657
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
658
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
659
    this.userAgent = builder.userAgent;
1✔
660

661
    this.channelBufferLimit = builder.retryBufferSize;
1✔
662
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
663
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
664
      @Override
665
      public CallTracer create() {
666
        return new CallTracer(timeProvider);
1✔
667
      }
668
    }
669

670
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
671
    channelCallTracer = callTracerFactory.create();
1✔
672
    this.channelz = checkNotNull(builder.channelz);
1✔
673
    channelz.addRootChannel(this);
1✔
674

675
    if (!lookUpServiceConfig) {
1✔
676
      if (defaultServiceConfig != null) {
1✔
677
        channelLogger.log(
1✔
678
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
679
      }
680
      serviceConfigUpdated = true;
1✔
681
    }
682
  }
1✔
683

684
  @VisibleForTesting
685
  static NameResolver getNameResolver(
686
      URI targetUri, @Nullable final String overrideAuthority,
687
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
688
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
689
    if (resolver == null) {
1✔
690
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
691
    }
692

693
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
694
    // TODO: After a transition period, all NameResolver implementations that need retry should use
695
    //       RetryingNameResolver directly and this step can be removed.
696
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
697
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
698
              nameResolverArgs.getScheduledExecutorService(),
1✔
699
              nameResolverArgs.getSynchronizationContext()),
1✔
700
          nameResolverArgs.getSynchronizationContext());
1✔
701

702
    if (overrideAuthority == null) {
1✔
703
      return usedNameResolver;
1✔
704
    }
705

706
    return new ForwardingNameResolver(usedNameResolver) {
1✔
707
      @Override
708
      public String getServiceAuthority() {
709
        return overrideAuthority;
1✔
710
      }
711
    };
712
  }
713

714
  @VisibleForTesting
715
  InternalConfigSelector getConfigSelector() {
716
    return realChannel.configSelector.get();
1✔
717
  }
718
  
719
  @VisibleForTesting
720
  boolean hasThrottle() {
721
    return this.transportProvider.throttle != null;
1✔
722
  }
723

724
  /**
725
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
726
   * cancelled.
727
   */
728
  @Override
729
  public ManagedChannelImpl shutdown() {
730
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
731
    if (!shutdown.compareAndSet(false, true)) {
1✔
732
      return this;
1✔
733
    }
734
    final class Shutdown implements Runnable {
1✔
735
      @Override
736
      public void run() {
737
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
738
        channelStateManager.gotoState(SHUTDOWN);
1✔
739
      }
1✔
740
    }
741

742
    syncContext.execute(new Shutdown());
1✔
743
    realChannel.shutdown();
1✔
744
    final class CancelIdleTimer implements Runnable {
1✔
745
      @Override
746
      public void run() {
747
        cancelIdleTimer(/* permanent= */ true);
1✔
748
      }
1✔
749
    }
750

751
    syncContext.execute(new CancelIdleTimer());
1✔
752
    return this;
1✔
753
  }
754

755
  /**
756
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
757
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
758
   * return {@code false} immediately after this method returns.
759
   */
760
  @Override
761
  public ManagedChannelImpl shutdownNow() {
762
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
763
    shutdown();
1✔
764
    realChannel.shutdownNow();
1✔
765
    final class ShutdownNow implements Runnable {
1✔
766
      @Override
767
      public void run() {
768
        if (shutdownNowed) {
1✔
769
          return;
1✔
770
        }
771
        shutdownNowed = true;
1✔
772
        maybeShutdownNowSubchannels();
1✔
773
      }
1✔
774
    }
775

776
    syncContext.execute(new ShutdownNow());
1✔
777
    return this;
1✔
778
  }
779

780
  // Called from syncContext
781
  @VisibleForTesting
782
  void panic(final Throwable t) {
783
    if (panicMode) {
1✔
784
      // Preserve the first panic information
785
      return;
×
786
    }
787
    panicMode = true;
1✔
788
    try {
789
      cancelIdleTimer(/* permanent= */ true);
1✔
790
      shutdownNameResolverAndLoadBalancer(false);
1✔
791
    } finally {
792
      updateSubchannelPicker(new LoadBalancer.FixedResultPicker(PickResult.withDrop(
1✔
793
          Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t))));
1✔
794
      realChannel.updateConfigSelector(null);
1✔
795
      channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
796
      channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
797
    }
798
  }
1✔
799

800
  @VisibleForTesting
801
  boolean isInPanicMode() {
802
    return panicMode;
1✔
803
  }
804

805
  // Called from syncContext
806
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
807
    subchannelPicker = newPicker;
1✔
808
    delayedTransport.reprocess(newPicker);
1✔
809
  }
1✔
810

811
  @Override
812
  public boolean isShutdown() {
813
    return shutdown.get();
1✔
814
  }
815

816
  @Override
817
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
818
    return terminatedLatch.await(timeout, unit);
1✔
819
  }
820

821
  @Override
822
  public boolean isTerminated() {
823
    return terminated;
1✔
824
  }
825

826
  /*
827
   * Creates a new outgoing call on the channel.
828
   */
829
  @Override
830
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
831
      CallOptions callOptions) {
832
    return interceptorChannel.newCall(method, callOptions);
1✔
833
  }
834

835
  @Override
836
  public String authority() {
837
    return interceptorChannel.authority();
1✔
838
  }
839

840
  private Executor getCallExecutor(CallOptions callOptions) {
841
    Executor executor = callOptions.getExecutor();
1✔
842
    if (executor == null) {
1✔
843
      executor = this.executor;
1✔
844
    }
845
    return executor;
1✔
846
  }
847

848
  private class RealChannel extends Channel {
849
    // Reference to null if no config selector is available from resolution result
850
    // Reference must be set() from syncContext
851
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
852
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
853
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
854
    // same target, the new instance must have the same value.
855
    private final String authority;
856

857
    private final Channel clientCallImplChannel = new Channel() {
1✔
858
      @Override
859
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
860
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
861
        return new ClientCallImpl<>(
1✔
862
            method,
863
            getCallExecutor(callOptions),
1✔
864
            callOptions,
865
            transportProvider,
1✔
866
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
867
            channelCallTracer,
1✔
868
            null)
869
            .setFullStreamDecompression(fullStreamDecompression)
1✔
870
            .setDecompressorRegistry(decompressorRegistry)
1✔
871
            .setCompressorRegistry(compressorRegistry);
1✔
872
      }
873

874
      @Override
875
      public String authority() {
876
        return authority;
×
877
      }
878
    };
879

880
    private RealChannel(String authority) {
1✔
881
      this.authority =  checkNotNull(authority, "authority");
1✔
882
    }
1✔
883

884
    @Override
885
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
886
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
887
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
888
        return newClientCall(method, callOptions);
1✔
889
      }
890
      syncContext.execute(new Runnable() {
1✔
891
        @Override
892
        public void run() {
893
          exitIdleMode();
1✔
894
        }
1✔
895
      });
896
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
897
        // This is an optimization for the case (typically with InProcessTransport) when name
898
        // resolution result is immediately available at this point. Otherwise, some users'
899
        // tests might observe slight behavior difference from earlier grpc versions.
900
        return newClientCall(method, callOptions);
1✔
901
      }
902
      if (shutdown.get()) {
1✔
903
        // Return a failing ClientCall.
904
        return new ClientCall<ReqT, RespT>() {
×
905
          @Override
906
          public void start(Listener<RespT> responseListener, Metadata headers) {
907
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
908
          }
×
909

910
          @Override public void request(int numMessages) {}
×
911

912
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
913

914
          @Override public void halfClose() {}
×
915

916
          @Override public void sendMessage(ReqT message) {}
×
917
        };
918
      }
919
      Context context = Context.current();
1✔
920
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
921
      syncContext.execute(new Runnable() {
1✔
922
        @Override
923
        public void run() {
924
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
925
            if (pendingCalls == null) {
1✔
926
              pendingCalls = new LinkedHashSet<>();
1✔
927
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
928
            }
929
            pendingCalls.add(pendingCall);
1✔
930
          } else {
931
            pendingCall.reprocess();
1✔
932
          }
933
        }
1✔
934
      });
935
      return pendingCall;
1✔
936
    }
937

938
    // Must run in SynchronizationContext.
939
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
940
      InternalConfigSelector prevConfig = configSelector.get();
1✔
941
      configSelector.set(config);
1✔
942
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
943
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
944
          pendingCall.reprocess();
1✔
945
        }
1✔
946
      }
947
    }
1✔
948

949
    // Must run in SynchronizationContext.
950
    void onConfigError() {
951
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
952
        // Apply Default Service Config if initial name resolution fails.
953
        if (defaultServiceConfig != null) {
1✔
954
          updateConfigSelector(defaultServiceConfig.getDefaultConfigSelector());
1✔
955
          lastServiceConfig = defaultServiceConfig;
1✔
956
          channelLogger.log(ChannelLogLevel.ERROR,
1✔
957
              "Initial Name Resolution error, using default service config");
958
        } else {
959
          updateConfigSelector(null);
1✔
960
        }
961
      }
962
    }
1✔
963

964
    void shutdown() {
965
      final class RealChannelShutdown implements Runnable {
1✔
966
        @Override
967
        public void run() {
968
          if (pendingCalls == null) {
1✔
969
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
970
              configSelector.set(null);
1✔
971
            }
972
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
973
          }
974
        }
1✔
975
      }
976

977
      syncContext.execute(new RealChannelShutdown());
1✔
978
    }
1✔
979

980
    void shutdownNow() {
981
      final class RealChannelShutdownNow implements Runnable {
1✔
982
        @Override
983
        public void run() {
984
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
985
            configSelector.set(null);
1✔
986
          }
987
          if (pendingCalls != null) {
1✔
988
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
989
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
990
            }
1✔
991
          }
992
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
993
        }
1✔
994
      }
995

996
      syncContext.execute(new RealChannelShutdownNow());
1✔
997
    }
1✔
998

999
    @Override
1000
    public String authority() {
1001
      return authority;
1✔
1002
    }
1003

1004
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1005
      final Context context;
1006
      final MethodDescriptor<ReqT, RespT> method;
1007
      final CallOptions callOptions;
1008
      private final long callCreationTime;
1009

1010
      PendingCall(
1011
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1012
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1013
        this.context = context;
1✔
1014
        this.method = method;
1✔
1015
        this.callOptions = callOptions;
1✔
1016
        this.callCreationTime = ticker.nanoTime();
1✔
1017
      }
1✔
1018

1019
      /** Called when it's ready to create a real call and reprocess the pending call. */
1020
      void reprocess() {
1021
        ClientCall<ReqT, RespT> realCall;
1022
        Context previous = context.attach();
1✔
1023
        try {
1024
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1025
              ticker.nanoTime() - callCreationTime);
1✔
1026
          realCall = newClientCall(method, delayResolutionOption);
1✔
1027
        } finally {
1028
          context.detach(previous);
1✔
1029
        }
1030
        Runnable toRun = setCall(realCall);
1✔
1031
        if (toRun == null) {
1✔
1032
          syncContext.execute(new PendingCallRemoval());
1✔
1033
        } else {
1034
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1035
            @Override
1036
            public void run() {
1037
              toRun.run();
1✔
1038
              syncContext.execute(new PendingCallRemoval());
1✔
1039
            }
1✔
1040
          });
1041
        }
1042
      }
1✔
1043

1044
      @Override
1045
      protected void callCancelled() {
1046
        super.callCancelled();
1✔
1047
        syncContext.execute(new PendingCallRemoval());
1✔
1048
      }
1✔
1049

1050
      final class PendingCallRemoval implements Runnable {
1✔
1051
        @Override
1052
        public void run() {
1053
          if (pendingCalls != null) {
1✔
1054
            pendingCalls.remove(PendingCall.this);
1✔
1055
            if (pendingCalls.isEmpty()) {
1✔
1056
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1057
              pendingCalls = null;
1✔
1058
              if (shutdown.get()) {
1✔
1059
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1060
              }
1061
            }
1062
          }
1063
        }
1✔
1064
      }
1065
    }
1066

1067
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1068
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1069
      InternalConfigSelector selector = configSelector.get();
1✔
1070
      if (selector == null) {
1✔
1071
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1072
      }
1073
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1074
        MethodInfo methodInfo =
1✔
1075
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1076
        if (methodInfo != null) {
1✔
1077
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1078
        }
1079
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1080
      }
1081
      return new ConfigSelectingClientCall<>(
1✔
1082
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1083
    }
1084
  }
1085

1086
  /**
1087
   * A client call for a given channel that applies a given config selector when it starts.
1088
   */
1089
  static final class ConfigSelectingClientCall<ReqT, RespT>
1090
      extends ForwardingClientCall<ReqT, RespT> {
1091

1092
    private final InternalConfigSelector configSelector;
1093
    private final Channel channel;
1094
    private final Executor callExecutor;
1095
    private final MethodDescriptor<ReqT, RespT> method;
1096
    private final Context context;
1097
    private CallOptions callOptions;
1098

1099
    private ClientCall<ReqT, RespT> delegate;
1100

1101
    ConfigSelectingClientCall(
1102
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1103
        MethodDescriptor<ReqT, RespT> method,
1104
        CallOptions callOptions) {
1✔
1105
      this.configSelector = configSelector;
1✔
1106
      this.channel = channel;
1✔
1107
      this.method = method;
1✔
1108
      this.callExecutor =
1✔
1109
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1110
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1111
      this.context = Context.current();
1✔
1112
    }
1✔
1113

1114
    @Override
1115
    protected ClientCall<ReqT, RespT> delegate() {
1116
      return delegate;
1✔
1117
    }
1118

1119
    @SuppressWarnings("unchecked")
1120
    @Override
1121
    public void start(Listener<RespT> observer, Metadata headers) {
1122
      PickSubchannelArgs args =
1✔
1123
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1124
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1125
      Status status = result.getStatus();
1✔
1126
      if (!status.isOk()) {
1✔
1127
        executeCloseObserverInContext(observer,
1✔
1128
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1129
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1130
        return;
1✔
1131
      }
1132
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1133
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1134
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1135
      if (methodInfo != null) {
1✔
1136
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1137
      }
1138
      if (interceptor != null) {
1✔
1139
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1140
      } else {
1141
        delegate = channel.newCall(method, callOptions);
×
1142
      }
1143
      delegate.start(observer, headers);
1✔
1144
    }
1✔
1145

1146
    private void executeCloseObserverInContext(
1147
        final Listener<RespT> observer, final Status status) {
1148
      class CloseInContext extends ContextRunnable {
1149
        CloseInContext() {
1✔
1150
          super(context);
1✔
1151
        }
1✔
1152

1153
        @Override
1154
        public void runInContext() {
1155
          observer.onClose(status, new Metadata());
1✔
1156
        }
1✔
1157
      }
1158

1159
      callExecutor.execute(new CloseInContext());
1✔
1160
    }
1✔
1161

1162
    @Override
1163
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1164
      if (delegate != null) {
×
1165
        delegate.cancel(message, cause);
×
1166
      }
1167
    }
×
1168
  }
1169

1170
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1171
    @Override
1172
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1173

1174
    @Override
1175
    public void request(int numMessages) {}
1✔
1176

1177
    @Override
1178
    public void cancel(String message, Throwable cause) {}
×
1179

1180
    @Override
1181
    public void halfClose() {}
×
1182

1183
    @Override
1184
    public void sendMessage(Object message) {}
×
1185

1186
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1187
    @Override
1188
    public boolean isReady() {
1189
      return false;
×
1190
    }
1191
  };
1192

1193
  /**
1194
   * Terminate the channel if termination conditions are met.
1195
   */
1196
  // Must be run from syncContext
1197
  private void maybeTerminateChannel() {
1198
    if (terminated) {
1✔
1199
      return;
×
1200
    }
1201
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1202
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1203
      channelz.removeRootChannel(this);
1✔
1204
      executorPool.returnObject(executor);
1✔
1205
      balancerRpcExecutorHolder.release();
1✔
1206
      offloadExecutorHolder.release();
1✔
1207
      // Release the transport factory so that it can deallocate any resources.
1208
      transportFactory.close();
1✔
1209

1210
      terminated = true;
1✔
1211
      terminatedLatch.countDown();
1✔
1212
    }
1213
  }
1✔
1214

1215
  // Must be called from syncContext
1216
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1217
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1218
      refreshNameResolution();
1✔
1219
    }
1220
  }
1✔
1221

1222
  @Override
1223
  @SuppressWarnings("deprecation")
1224
  public ConnectivityState getState(boolean requestConnection) {
1225
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1226
    if (requestConnection && savedChannelState == IDLE) {
1✔
1227
      final class RequestConnection implements Runnable {
1✔
1228
        @Override
1229
        public void run() {
1230
          exitIdleMode();
1✔
1231
          if (subchannelPicker != null) {
1✔
1232
            subchannelPicker.requestConnection();
1✔
1233
          }
1234
          if (lbHelper != null) {
1✔
1235
            lbHelper.lb.requestConnection();
1✔
1236
          }
1237
        }
1✔
1238
      }
1239

1240
      syncContext.execute(new RequestConnection());
1✔
1241
    }
1242
    return savedChannelState;
1✔
1243
  }
1244

1245
  @Override
1246
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1247
    final class NotifyStateChanged implements Runnable {
1✔
1248
      @Override
1249
      public void run() {
1250
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1251
      }
1✔
1252
    }
1253

1254
    syncContext.execute(new NotifyStateChanged());
1✔
1255
  }
1✔
1256

1257
  @Override
1258
  public void resetConnectBackoff() {
1259
    final class ResetConnectBackoff implements Runnable {
1✔
1260
      @Override
1261
      public void run() {
1262
        if (shutdown.get()) {
1✔
1263
          return;
1✔
1264
        }
1265
        if (nameResolverStarted) {
1✔
1266
          refreshNameResolution();
1✔
1267
        }
1268
        for (InternalSubchannel subchannel : subchannels) {
1✔
1269
          subchannel.resetConnectBackoff();
1✔
1270
        }
1✔
1271
        for (OobChannel oobChannel : oobChannels) {
1✔
1272
          oobChannel.resetConnectBackoff();
×
1273
        }
×
1274
      }
1✔
1275
    }
1276

1277
    syncContext.execute(new ResetConnectBackoff());
1✔
1278
  }
1✔
1279

1280
  @Override
1281
  public void enterIdle() {
1282
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1283
      @Override
1284
      public void run() {
1285
        if (shutdown.get() || lbHelper == null) {
1✔
1286
          return;
1✔
1287
        }
1288
        cancelIdleTimer(/* permanent= */ false);
1✔
1289
        enterIdleMode();
1✔
1290
      }
1✔
1291
    }
1292

1293
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1294
  }
1✔
1295

1296
  /**
1297
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1298
   * backoff.
1299
   */
1300
  private final class UncommittedRetriableStreamsRegistry {
1✔
1301
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1302
    // it's worthwhile to look for a lock-free approach.
1303
    final Object lock = new Object();
1✔
1304

1305
    @GuardedBy("lock")
1✔
1306
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1307

1308
    @GuardedBy("lock")
1309
    Status shutdownStatus;
1310

1311
    void onShutdown(Status reason) {
1312
      boolean shouldShutdownDelayedTransport = false;
1✔
1313
      synchronized (lock) {
1✔
1314
        if (shutdownStatus != null) {
1✔
1315
          return;
1✔
1316
        }
1317
        shutdownStatus = reason;
1✔
1318
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1319
        // retriable streams, which may be in backoff and not using any transport, are already
1320
        // started RPCs.
1321
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1322
          shouldShutdownDelayedTransport = true;
1✔
1323
        }
1324
      }
1✔
1325

1326
      if (shouldShutdownDelayedTransport) {
1✔
1327
        delayedTransport.shutdown(reason);
1✔
1328
      }
1329
    }
1✔
1330

1331
    void onShutdownNow(Status reason) {
1332
      onShutdown(reason);
1✔
1333
      Collection<ClientStream> streams;
1334

1335
      synchronized (lock) {
1✔
1336
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1337
      }
1✔
1338

1339
      for (ClientStream stream : streams) {
1✔
1340
        stream.cancel(reason);
1✔
1341
      }
1✔
1342
      delayedTransport.shutdownNow(reason);
1✔
1343
    }
1✔
1344

1345
    /**
1346
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1347
     * shutdown Status.
1348
     */
1349
    @Nullable
1350
    Status add(RetriableStream<?> retriableStream) {
1351
      synchronized (lock) {
1✔
1352
        if (shutdownStatus != null) {
1✔
1353
          return shutdownStatus;
1✔
1354
        }
1355
        uncommittedRetriableStreams.add(retriableStream);
1✔
1356
        return null;
1✔
1357
      }
1358
    }
1359

1360
    void remove(RetriableStream<?> retriableStream) {
1361
      Status shutdownStatusCopy = null;
1✔
1362

1363
      synchronized (lock) {
1✔
1364
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1365
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1366
          shutdownStatusCopy = shutdownStatus;
1✔
1367
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1368
          // hashmap.
1369
          uncommittedRetriableStreams = new HashSet<>();
1✔
1370
        }
1371
      }
1✔
1372

1373
      if (shutdownStatusCopy != null) {
1✔
1374
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1375
      }
1376
    }
1✔
1377
  }
1378

1379
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1380
    AutoConfiguredLoadBalancer lb;
1381

1382
    @Override
1383
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1384
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1385
      // No new subchannel should be created after load balancer has been shutdown.
1386
      checkState(!terminating, "Channel is being terminated");
1✔
1387
      return new SubchannelImpl(args);
1✔
1388
    }
1389

1390
    @Override
1391
    public void updateBalancingState(
1392
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1393
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1394
      checkNotNull(newState, "newState");
1✔
1395
      checkNotNull(newPicker, "newPicker");
1✔
1396

1397
      if (LbHelperImpl.this != lbHelper || panicMode) {
1✔
1398
        return;
1✔
1399
      }
1400
      updateSubchannelPicker(newPicker);
1✔
1401
      // It's not appropriate to report SHUTDOWN state from lb.
1402
      // Ignore the case of newState == SHUTDOWN for now.
1403
      if (newState != SHUTDOWN) {
1✔
1404
        channelLogger.log(
1✔
1405
            ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1406
        channelStateManager.gotoState(newState);
1✔
1407
      }
1408
    }
1✔
1409

1410
    @Override
1411
    public void refreshNameResolution() {
1412
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1413
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1414
        @Override
1415
        public void run() {
1416
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1417
        }
1✔
1418
      }
1419

1420
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1421
    }
1✔
1422

1423
    @Override
1424
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1425
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1426
    }
1427

1428
    @Override
1429
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1430
        String authority) {
1431
      // TODO(ejona): can we be even stricter? Like terminating?
1432
      checkState(!terminated, "Channel is terminated");
1✔
1433
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1434
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1435
      InternalLogId subchannelLogId =
1✔
1436
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1437
      ChannelTracer oobChannelTracer =
1✔
1438
          new ChannelTracer(
1439
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1440
              "OobChannel for " + addressGroup);
1441
      final OobChannel oobChannel = new OobChannel(
1✔
1442
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1443
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1444
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1445
          .setDescription("Child OobChannel created")
1✔
1446
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1447
          .setTimestampNanos(oobChannelCreationTime)
1✔
1448
          .setChannelRef(oobChannel)
1✔
1449
          .build());
1✔
1450
      ChannelTracer subchannelTracer =
1✔
1451
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1452
              "Subchannel for " + addressGroup);
1453
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1454
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1455
        @Override
1456
        void onTerminated(InternalSubchannel is) {
1457
          oobChannels.remove(oobChannel);
1✔
1458
          channelz.removeSubchannel(is);
1✔
1459
          oobChannel.handleSubchannelTerminated();
1✔
1460
          maybeTerminateChannel();
1✔
1461
        }
1✔
1462

1463
        @Override
1464
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1465
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1466
          //  state and refresh name resolution if necessary.
1467
          handleInternalSubchannelState(newState);
1✔
1468
          oobChannel.handleSubchannelStateChange(newState);
1✔
1469
        }
1✔
1470
      }
1471

1472
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1473
          CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
1✔
1474
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1475
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1476
          // All callback methods are run from syncContext
1477
          new ManagedOobChannelCallback(),
1478
          channelz,
1✔
1479
          callTracerFactory.create(),
1✔
1480
          subchannelTracer,
1481
          subchannelLogId,
1482
          subchannelLogger,
1483
          transportFilters);
1✔
1484
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1485
          .setDescription("Child Subchannel created")
1✔
1486
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1487
          .setTimestampNanos(oobChannelCreationTime)
1✔
1488
          .setSubchannelRef(internalSubchannel)
1✔
1489
          .build());
1✔
1490
      channelz.addSubchannel(oobChannel);
1✔
1491
      channelz.addSubchannel(internalSubchannel);
1✔
1492
      oobChannel.setSubchannel(internalSubchannel);
1✔
1493
      final class AddOobChannel implements Runnable {
1✔
1494
        @Override
1495
        public void run() {
1496
          if (terminating) {
1✔
1497
            oobChannel.shutdown();
×
1498
          }
1499
          if (!terminated) {
1✔
1500
            // If channel has not terminated, it will track the subchannel and block termination
1501
            // for it.
1502
            oobChannels.add(oobChannel);
1✔
1503
          }
1504
        }
1✔
1505
      }
1506

1507
      syncContext.execute(new AddOobChannel());
1✔
1508
      return oobChannel;
1✔
1509
    }
1510

1511
    @Deprecated
1512
    @Override
1513
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1514
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1515
          // Override authority to keep the old behavior.
1516
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1517
          .overrideAuthority(getAuthority());
1✔
1518
    }
1519

1520
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1521
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1522
    @Override
1523
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1524
        final String target, final ChannelCredentials channelCreds) {
1525
      checkNotNull(channelCreds, "channelCreds");
1✔
1526

1527
      final class ResolvingOobChannelBuilder
1528
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1529
        final ManagedChannelBuilder<?> delegate;
1530

1531
        ResolvingOobChannelBuilder() {
1✔
1532
          final ClientTransportFactory transportFactory;
1533
          CallCredentials callCredentials;
1534
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1535
            transportFactory = originalTransportFactory;
1✔
1536
            callCredentials = null;
1✔
1537
          } else {
1538
            SwapChannelCredentialsResult swapResult =
1✔
1539
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1540
            if (swapResult == null) {
1✔
1541
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1542
              return;
×
1543
            } else {
1544
              transportFactory = swapResult.transportFactory;
1✔
1545
              callCredentials = swapResult.callCredentials;
1✔
1546
            }
1547
          }
1548
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1549
              new ClientTransportFactoryBuilder() {
1✔
1550
                @Override
1551
                public ClientTransportFactory buildClientTransportFactory() {
1552
                  return transportFactory;
1✔
1553
                }
1554
              };
1555
          delegate = new ManagedChannelImplBuilder(
1✔
1556
              target,
1557
              channelCreds,
1558
              callCredentials,
1559
              transportFactoryBuilder,
1560
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1561
              .nameResolverRegistry(nameResolverRegistry);
1✔
1562
        }
1✔
1563

1564
        @Override
1565
        protected ManagedChannelBuilder<?> delegate() {
1566
          return delegate;
1✔
1567
        }
1568
      }
1569

1570
      checkState(!terminated, "Channel is terminated");
1✔
1571

1572
      @SuppressWarnings("deprecation")
1573
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1574

1575
      return builder
1✔
1576
          // TODO(zdapeng): executors should not outlive the parent channel.
1577
          .executor(executor)
1✔
1578
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1579
          .maxTraceEvents(maxTraceEvents)
1✔
1580
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1581
          .userAgent(userAgent);
1✔
1582
    }
1583

1584
    @Override
1585
    public ChannelCredentials getUnsafeChannelCredentials() {
1586
      if (originalChannelCreds == null) {
1✔
1587
        return new DefaultChannelCreds();
1✔
1588
      }
1589
      return originalChannelCreds;
×
1590
    }
1591

1592
    @Override
1593
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1594
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1595
    }
×
1596

1597
    @Override
1598
    public void updateOobChannelAddresses(ManagedChannel channel,
1599
        List<EquivalentAddressGroup> eag) {
1600
      checkArgument(channel instanceof OobChannel,
1✔
1601
          "channel must have been returned from createOobChannel");
1602
      ((OobChannel) channel).updateAddresses(eag);
1✔
1603
    }
1✔
1604

1605
    @Override
1606
    public String getAuthority() {
1607
      return ManagedChannelImpl.this.authority();
1✔
1608
    }
1609

1610
    @Override
1611
    public String getChannelTarget() {
1612
      return targetUri.toString();
1✔
1613
    }
1614

1615
    @Override
1616
    public SynchronizationContext getSynchronizationContext() {
1617
      return syncContext;
1✔
1618
    }
1619

1620
    @Override
1621
    public ScheduledExecutorService getScheduledExecutorService() {
1622
      return scheduledExecutor;
1✔
1623
    }
1624

1625
    @Override
1626
    public ChannelLogger getChannelLogger() {
1627
      return channelLogger;
1✔
1628
    }
1629

1630
    @Override
1631
    public NameResolver.Args getNameResolverArgs() {
1632
      return nameResolverArgs;
1✔
1633
    }
1634

1635
    @Override
1636
    public NameResolverRegistry getNameResolverRegistry() {
1637
      return nameResolverRegistry;
1✔
1638
    }
1639

1640
    @Override
1641
    public MetricRecorder getMetricRecorder() {
1642
      return metricRecorder;
1✔
1643
    }
1644

1645
    /**
1646
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1647
     */
1648
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1649
    //     channel creds.
1650
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1651
      @Override
1652
      public ChannelCredentials withoutBearerTokens() {
1653
        return this;
×
1654
      }
1655
    }
1656
  }
1657

1658
  final class NameResolverListener extends NameResolver.Listener2 {
1659
    final LbHelperImpl helper;
1660
    final NameResolver resolver;
1661

1662
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1663
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1664
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1665
    }
1✔
1666

1667
    @Override
1668
    public void onResult(final ResolutionResult resolutionResult) {
1669
      final class NamesResolved implements Runnable {
1✔
1670

1671
        @Override
1672
        public void run() {
1673
          Status status = onResult2(resolutionResult);
1✔
1674
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1675
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1676
          resolutionResultListener.resolutionAttempted(status);
1✔
1677
        }
1✔
1678
      }
1679

1680
      syncContext.execute(new NamesResolved());
1✔
1681
    }
1✔
1682

1683
    @SuppressWarnings("ReferenceEquality")
1684
    @Override
1685
    public Status onResult2(final ResolutionResult resolutionResult) {
1686
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1687
      if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1688
        return Status.OK;
1✔
1689
      }
1690

1691
      StatusOr<List<EquivalentAddressGroup>> serversOrError =
1✔
1692
          resolutionResult.getAddressesOrError();
1✔
1693
      if (!serversOrError.hasValue()) {
1✔
1694
        handleErrorInSyncContext(serversOrError.getStatus());
1✔
1695
        return serversOrError.getStatus();
1✔
1696
      }
1697
      List<EquivalentAddressGroup> servers = serversOrError.getValue();
1✔
1698
      channelLogger.log(
1✔
1699
          ChannelLogLevel.DEBUG,
1700
          "Resolved address: {0}, config={1}",
1701
          servers,
1702
          resolutionResult.getAttributes());
1✔
1703

1704
      if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1705
        channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}",
1✔
1706
            servers);
1707
        lastResolutionState = ResolutionState.SUCCESS;
1✔
1708
      }
1709
      ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1710
      InternalConfigSelector resolvedConfigSelector =
1✔
1711
          resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1712
      ManagedChannelServiceConfig validServiceConfig =
1713
          configOrError != null && configOrError.getConfig() != null
1✔
1714
              ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1715
              : null;
1✔
1716
      Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1717

1718
      ManagedChannelServiceConfig effectiveServiceConfig;
1719
      if (!lookUpServiceConfig) {
1✔
1720
        if (validServiceConfig != null) {
1✔
1721
          channelLogger.log(
1✔
1722
              ChannelLogLevel.INFO,
1723
              "Service config from name resolver discarded by channel settings");
1724
        }
1725
        effectiveServiceConfig =
1726
            defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1727
        if (resolvedConfigSelector != null) {
1✔
1728
          channelLogger.log(
1✔
1729
              ChannelLogLevel.INFO,
1730
              "Config selector from name resolver discarded by channel settings");
1731
        }
1732
        realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1733
      } else {
1734
        // Try to use config if returned from name resolver
1735
        // Otherwise, try to use the default config if available
1736
        if (validServiceConfig != null) {
1✔
1737
          effectiveServiceConfig = validServiceConfig;
1✔
1738
          if (resolvedConfigSelector != null) {
1✔
1739
            realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1740
            if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1741
              channelLogger.log(
×
1742
                  ChannelLogLevel.DEBUG,
1743
                  "Method configs in service config will be discarded due to presence of"
1744
                      + "config-selector");
1745
            }
1746
          } else {
1747
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1748
          }
1749
        } else if (defaultServiceConfig != null) {
1✔
1750
          effectiveServiceConfig = defaultServiceConfig;
1✔
1751
          realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1752
          channelLogger.log(
1✔
1753
              ChannelLogLevel.INFO,
1754
              "Received no service config, using default service config");
1755
        } else if (serviceConfigError != null) {
1✔
1756
          if (!serviceConfigUpdated) {
1✔
1757
            // First DNS lookup has invalid service config, and cannot fall back to default
1758
            channelLogger.log(
1✔
1759
                ChannelLogLevel.INFO,
1760
                "Fallback to error due to invalid first service config without default config");
1761
            // This error could be an "inappropriate" control plane error that should not bleed
1762
            // through to client code using gRPC. We let them flow through here to the LB as
1763
            // we later check for these error codes when investigating pick results in
1764
            // GrpcUtil.getTransportFromPickResult().
1765
            onError(configOrError.getError());
1✔
1766
            return configOrError.getError();
1✔
1767
          } else {
1768
            effectiveServiceConfig = lastServiceConfig;
1✔
1769
          }
1770
        } else {
1771
          effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1772
          realChannel.updateConfigSelector(null);
1✔
1773
        }
1774
        if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1775
          channelLogger.log(
1✔
1776
              ChannelLogLevel.INFO,
1777
              "Service config changed{0}",
1778
              effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1779
          lastServiceConfig = effectiveServiceConfig;
1✔
1780
          transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1781
        }
1782

1783
        try {
1784
          // TODO(creamsoup): when `serversOrError` is empty and lastResolutionStateCopy == SUCCESS
1785
          //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1786
          //  lbNeedAddress is not deterministic
1787
          serviceConfigUpdated = true;
1✔
1788
        } catch (RuntimeException re) {
×
1789
          logger.log(
×
1790
              Level.WARNING,
1791
              "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1792
              re);
1793
        }
1✔
1794
      }
1795

1796
      Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1797
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1798
      if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1799
        Attributes.Builder attrBuilder =
1✔
1800
            effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1801
        Map<String, ?> healthCheckingConfig =
1✔
1802
            effectiveServiceConfig.getHealthCheckingConfig();
1✔
1803
        if (healthCheckingConfig != null) {
1✔
1804
          attrBuilder
1✔
1805
              .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1806
              .build();
1✔
1807
        }
1808
        Attributes attributes = attrBuilder.build();
1✔
1809

1810
        ResolvedAddresses.Builder resolvedAddresses = ResolvedAddresses.newBuilder()
1✔
1811
            .setAddresses(serversOrError.getValue())
1✔
1812
            .setAttributes(attributes)
1✔
1813
            .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig());
1✔
1814
        Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1815
            resolvedAddresses.build());
1✔
1816
        return addressAcceptanceStatus;
1✔
1817
      }
1818
      return Status.OK;
×
1819
    }
1820

1821
    @Override
1822
    public void onError(final Status error) {
1823
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1824
      final class NameResolverErrorHandler implements Runnable {
1✔
1825
        @Override
1826
        public void run() {
1827
          handleErrorInSyncContext(error);
1✔
1828
        }
1✔
1829
      }
1830

1831
      syncContext.execute(new NameResolverErrorHandler());
1✔
1832
    }
1✔
1833

1834
    private void handleErrorInSyncContext(Status error) {
1835
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1836
          new Object[] {getLogId(), error});
1✔
1837
      realChannel.onConfigError();
1✔
1838
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1839
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1840
        lastResolutionState = ResolutionState.ERROR;
1✔
1841
      }
1842
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1843
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1844
        return;
1✔
1845
      }
1846

1847
      helper.lb.handleNameResolutionError(error);
1✔
1848
    }
1✔
1849
  }
1850

1851
  private final class SubchannelImpl extends AbstractSubchannel {
1852
    final CreateSubchannelArgs args;
1853
    final InternalLogId subchannelLogId;
1854
    final ChannelLoggerImpl subchannelLogger;
1855
    final ChannelTracer subchannelTracer;
1856
    List<EquivalentAddressGroup> addressGroups;
1857
    InternalSubchannel subchannel;
1858
    boolean started;
1859
    boolean shutdown;
1860
    ScheduledHandle delayedShutdownTask;
1861

1862
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1863
      checkNotNull(args, "args");
1✔
1864
      addressGroups = args.getAddresses();
1✔
1865
      if (authorityOverride != null) {
1✔
1866
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1867
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1868
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1869
      }
1870
      this.args = args;
1✔
1871
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1872
      subchannelTracer = new ChannelTracer(
1✔
1873
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1874
          "Subchannel for " + args.getAddresses());
1✔
1875
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1876
    }
1✔
1877

1878
    @Override
1879
    public void start(final SubchannelStateListener listener) {
1880
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1881
      checkState(!started, "already started");
1✔
1882
      checkState(!shutdown, "already shutdown");
1✔
1883
      checkState(!terminating, "Channel is being terminated");
1✔
1884
      started = true;
1✔
1885
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1886
        // All callbacks are run in syncContext
1887
        @Override
1888
        void onTerminated(InternalSubchannel is) {
1889
          subchannels.remove(is);
1✔
1890
          channelz.removeSubchannel(is);
1✔
1891
          maybeTerminateChannel();
1✔
1892
        }
1✔
1893

1894
        @Override
1895
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1896
          checkState(listener != null, "listener is null");
1✔
1897
          listener.onSubchannelState(newState);
1✔
1898
        }
1✔
1899

1900
        @Override
1901
        void onInUse(InternalSubchannel is) {
1902
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1903
        }
1✔
1904

1905
        @Override
1906
        void onNotInUse(InternalSubchannel is) {
1907
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1908
        }
1✔
1909
      }
1910

1911
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1912
          args,
1913
          authority(),
1✔
1914
          userAgent,
1✔
1915
          backoffPolicyProvider,
1✔
1916
          transportFactory,
1✔
1917
          transportFactory.getScheduledExecutorService(),
1✔
1918
          stopwatchSupplier,
1✔
1919
          syncContext,
1920
          new ManagedInternalSubchannelCallback(),
1921
          channelz,
1✔
1922
          callTracerFactory.create(),
1✔
1923
          subchannelTracer,
1924
          subchannelLogId,
1925
          subchannelLogger,
1926
          transportFilters);
1✔
1927

1928
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1929
          .setDescription("Child Subchannel started")
1✔
1930
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1931
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1932
          .setSubchannelRef(internalSubchannel)
1✔
1933
          .build());
1✔
1934

1935
      this.subchannel = internalSubchannel;
1✔
1936
      channelz.addSubchannel(internalSubchannel);
1✔
1937
      subchannels.add(internalSubchannel);
1✔
1938
    }
1✔
1939

1940
    @Override
1941
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1942
      checkState(started, "not started");
1✔
1943
      return subchannel;
1✔
1944
    }
1945

1946
    @Override
1947
    public void shutdown() {
1948
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1949
      if (subchannel == null) {
1✔
1950
        // start() was not successful
1951
        shutdown = true;
×
1952
        return;
×
1953
      }
1954
      if (shutdown) {
1✔
1955
        if (terminating && delayedShutdownTask != null) {
1✔
1956
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1957
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1958
          delayedShutdownTask.cancel();
×
1959
          delayedShutdownTask = null;
×
1960
          // Will fall through to the subchannel.shutdown() at the end.
1961
        } else {
1962
          return;
1✔
1963
        }
1964
      } else {
1965
        shutdown = true;
1✔
1966
      }
1967
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1968
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1969
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1970
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1971
      // shutdown of Subchannel for a few seconds here.
1972
      //
1973
      // TODO(zhangkun83): consider a better approach
1974
      // (https://github.com/grpc/grpc-java/issues/2562).
1975
      if (!terminating) {
1✔
1976
        final class ShutdownSubchannel implements Runnable {
1✔
1977
          @Override
1978
          public void run() {
1979
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
1980
          }
1✔
1981
        }
1982

1983
        delayedShutdownTask = syncContext.schedule(
1✔
1984
            new LogExceptionRunnable(new ShutdownSubchannel()),
1985
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
1986
            transportFactory.getScheduledExecutorService());
1✔
1987
        return;
1✔
1988
      }
1989
      // When terminating == true, no more real streams will be created. It's safe and also
1990
      // desirable to shutdown timely.
1991
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
1992
    }
1✔
1993

1994
    @Override
1995
    public void requestConnection() {
1996
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1997
      checkState(started, "not started");
1✔
1998
      subchannel.obtainActiveTransport();
1✔
1999
    }
1✔
2000

2001
    @Override
2002
    public List<EquivalentAddressGroup> getAllAddresses() {
2003
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2004
      checkState(started, "not started");
1✔
2005
      return addressGroups;
1✔
2006
    }
2007

2008
    @Override
2009
    public Attributes getAttributes() {
2010
      return args.getAttributes();
1✔
2011
    }
2012

2013
    @Override
2014
    public String toString() {
2015
      return subchannelLogId.toString();
1✔
2016
    }
2017

2018
    @Override
2019
    public Channel asChannel() {
2020
      checkState(started, "not started");
1✔
2021
      return new SubchannelChannel(
1✔
2022
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2023
          transportFactory.getScheduledExecutorService(),
1✔
2024
          callTracerFactory.create(),
1✔
2025
          new AtomicReference<InternalConfigSelector>(null));
2026
    }
2027

2028
    @Override
2029
    public Object getInternalSubchannel() {
2030
      checkState(started, "Subchannel is not started");
1✔
2031
      return subchannel;
1✔
2032
    }
2033

2034
    @Override
2035
    public ChannelLogger getChannelLogger() {
2036
      return subchannelLogger;
1✔
2037
    }
2038

2039
    @Override
2040
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2041
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2042
      addressGroups = addrs;
1✔
2043
      if (authorityOverride != null) {
1✔
2044
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2045
      }
2046
      subchannel.updateAddresses(addrs);
1✔
2047
    }
1✔
2048

2049
    @Override
2050
    public Attributes getConnectedAddressAttributes() {
2051
      return subchannel.getConnectedAddressAttributes();
1✔
2052
    }
2053

2054
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2055
        List<EquivalentAddressGroup> eags) {
2056
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2057
      for (EquivalentAddressGroup eag : eags) {
1✔
2058
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2059
            eag.getAddresses(),
1✔
2060
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2061
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2062
      }
1✔
2063
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2064
    }
2065
  }
2066

2067
  @Override
2068
  public String toString() {
2069
    return MoreObjects.toStringHelper(this)
1✔
2070
        .add("logId", logId.getId())
1✔
2071
        .add("target", target)
1✔
2072
        .toString();
1✔
2073
  }
2074

2075
  /**
2076
   * Called from syncContext.
2077
   */
2078
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2079
    @Override
2080
    public void transportShutdown(Status s) {
2081
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2082
    }
1✔
2083

2084
    @Override
2085
    public void transportReady() {
2086
      // Don't care
2087
    }
×
2088

2089
    @Override
2090
    public Attributes filterTransport(Attributes attributes) {
2091
      return attributes;
×
2092
    }
2093

2094
    @Override
2095
    public void transportInUse(final boolean inUse) {
2096
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2097
      if (inUse) {
1✔
2098
        // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
2099
        // subchannels is in use. But we should never be in idle mode when delayed transport is in
2100
        // use.
2101
        exitIdleMode();
1✔
2102
      }
2103
    }
1✔
2104

2105
    @Override
2106
    public void transportTerminated() {
2107
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2108
      terminating = true;
1✔
2109
      shutdownNameResolverAndLoadBalancer(false);
1✔
2110
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2111
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2112
      // here.
2113
      maybeShutdownNowSubchannels();
1✔
2114
      maybeTerminateChannel();
1✔
2115
    }
1✔
2116
  }
2117

2118
  /**
2119
   * Must be accessed from syncContext.
2120
   */
2121
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2122
    @Override
2123
    protected void handleInUse() {
2124
      exitIdleMode();
1✔
2125
    }
1✔
2126

2127
    @Override
2128
    protected void handleNotInUse() {
2129
      if (shutdown.get()) {
1✔
2130
        return;
1✔
2131
      }
2132
      rescheduleIdleTimer();
1✔
2133
    }
1✔
2134
  }
2135

2136
  /**
2137
   * Lazily request for Executor from an executor pool.
2138
   * Also act as an Executor directly to simply run a cmd
2139
   */
2140
  @VisibleForTesting
2141
  static final class ExecutorHolder implements Executor {
2142
    private final ObjectPool<? extends Executor> pool;
2143
    private Executor executor;
2144

2145
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2146
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2147
    }
1✔
2148

2149
    synchronized Executor getExecutor() {
2150
      if (executor == null) {
1✔
2151
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2152
      }
2153
      return executor;
1✔
2154
    }
2155

2156
    synchronized void release() {
2157
      if (executor != null) {
1✔
2158
        executor = pool.returnObject(executor);
1✔
2159
      }
2160
    }
1✔
2161

2162
    @Override
2163
    public void execute(Runnable command) {
2164
      getExecutor().execute(command);
1✔
2165
    }
1✔
2166
  }
2167

2168
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2169
    final ScheduledExecutorService delegate;
2170

2171
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2172
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2173
    }
1✔
2174

2175
    @Override
2176
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2177
      return delegate.schedule(callable, delay, unit);
×
2178
    }
2179

2180
    @Override
2181
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2182
      return delegate.schedule(cmd, delay, unit);
1✔
2183
    }
2184

2185
    @Override
2186
    public ScheduledFuture<?> scheduleAtFixedRate(
2187
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2188
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
1✔
2189
    }
2190

2191
    @Override
2192
    public ScheduledFuture<?> scheduleWithFixedDelay(
2193
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2194
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2195
    }
2196

2197
    @Override
2198
    public boolean awaitTermination(long timeout, TimeUnit unit)
2199
        throws InterruptedException {
2200
      return delegate.awaitTermination(timeout, unit);
×
2201
    }
2202

2203
    @Override
2204
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2205
        throws InterruptedException {
2206
      return delegate.invokeAll(tasks);
×
2207
    }
2208

2209
    @Override
2210
    public <T> List<Future<T>> invokeAll(
2211
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2212
        throws InterruptedException {
2213
      return delegate.invokeAll(tasks, timeout, unit);
×
2214
    }
2215

2216
    @Override
2217
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2218
        throws InterruptedException, ExecutionException {
2219
      return delegate.invokeAny(tasks);
×
2220
    }
2221

2222
    @Override
2223
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2224
        throws InterruptedException, ExecutionException, TimeoutException {
2225
      return delegate.invokeAny(tasks, timeout, unit);
×
2226
    }
2227

2228
    @Override
2229
    public boolean isShutdown() {
2230
      return delegate.isShutdown();
×
2231
    }
2232

2233
    @Override
2234
    public boolean isTerminated() {
2235
      return delegate.isTerminated();
×
2236
    }
2237

2238
    @Override
2239
    public void shutdown() {
2240
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2241
    }
2242

2243
    @Override
2244
    public List<Runnable> shutdownNow() {
2245
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2246
    }
2247

2248
    @Override
2249
    public <T> Future<T> submit(Callable<T> task) {
2250
      return delegate.submit(task);
×
2251
    }
2252

2253
    @Override
2254
    public Future<?> submit(Runnable task) {
2255
      return delegate.submit(task);
×
2256
    }
2257

2258
    @Override
2259
    public <T> Future<T> submit(Runnable task, T result) {
2260
      return delegate.submit(task, result);
×
2261
    }
2262

2263
    @Override
2264
    public void execute(Runnable command) {
2265
      delegate.execute(command);
×
2266
    }
×
2267
  }
2268

2269
  /**
2270
   * A ResolutionState indicates the status of last name resolution.
2271
   */
2272
  enum ResolutionState {
1✔
2273
    NO_RESOLUTION,
1✔
2274
    SUCCESS,
1✔
2275
    ERROR
1✔
2276
  }
2277
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc