• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18716

pending completion
#18716

push

github-actions

web-flow
core: Apply RetryingNameResolver in ManagedChannelImpl (#10371) (#10374)

Wrapping the DnsNameResolver in DnsNameResolverProvider can cause
problems to external name resolvers that delegate to a DnsResolver
already wrapped in RetryingNameResolver. ManagedChannelImpl would
end up wrapping these name resolvers again, causing an exception
later from a RetryingNameResolver safeguard that checks for double
wrapping.

30942 of 35069 relevant lines covered (88.23%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.48
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.IDLE;
24
import static io.grpc.ConnectivityState.SHUTDOWN;
25
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
26
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.MoreObjects;
30
import com.google.common.base.Stopwatch;
31
import com.google.common.base.Supplier;
32
import com.google.common.util.concurrent.ListenableFuture;
33
import com.google.common.util.concurrent.SettableFuture;
34
import io.grpc.Attributes;
35
import io.grpc.CallCredentials;
36
import io.grpc.CallOptions;
37
import io.grpc.Channel;
38
import io.grpc.ChannelCredentials;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.ClientCall;
42
import io.grpc.ClientInterceptor;
43
import io.grpc.ClientInterceptors;
44
import io.grpc.ClientStreamTracer;
45
import io.grpc.CompressorRegistry;
46
import io.grpc.ConnectivityState;
47
import io.grpc.ConnectivityStateInfo;
48
import io.grpc.Context;
49
import io.grpc.DecompressorRegistry;
50
import io.grpc.EquivalentAddressGroup;
51
import io.grpc.ForwardingChannelBuilder;
52
import io.grpc.ForwardingClientCall;
53
import io.grpc.Grpc;
54
import io.grpc.InternalChannelz;
55
import io.grpc.InternalChannelz.ChannelStats;
56
import io.grpc.InternalChannelz.ChannelTrace;
57
import io.grpc.InternalConfigSelector;
58
import io.grpc.InternalInstrumented;
59
import io.grpc.InternalLogId;
60
import io.grpc.InternalWithLogId;
61
import io.grpc.LoadBalancer;
62
import io.grpc.LoadBalancer.CreateSubchannelArgs;
63
import io.grpc.LoadBalancer.PickResult;
64
import io.grpc.LoadBalancer.PickSubchannelArgs;
65
import io.grpc.LoadBalancer.ResolvedAddresses;
66
import io.grpc.LoadBalancer.SubchannelPicker;
67
import io.grpc.LoadBalancer.SubchannelStateListener;
68
import io.grpc.ManagedChannel;
69
import io.grpc.ManagedChannelBuilder;
70
import io.grpc.Metadata;
71
import io.grpc.MethodDescriptor;
72
import io.grpc.NameResolver;
73
import io.grpc.NameResolver.ConfigOrError;
74
import io.grpc.NameResolver.ResolutionResult;
75
import io.grpc.NameResolverRegistry;
76
import io.grpc.ProxyDetector;
77
import io.grpc.Status;
78
import io.grpc.SynchronizationContext;
79
import io.grpc.SynchronizationContext.ScheduledHandle;
80
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
81
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
82
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
83
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
84
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
85
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
86
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
87
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
88
import io.grpc.internal.RetriableStream.Throttle;
89
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
90
import java.net.URI;
91
import java.net.URISyntaxException;
92
import java.util.ArrayList;
93
import java.util.Collection;
94
import java.util.Collections;
95
import java.util.HashSet;
96
import java.util.LinkedHashSet;
97
import java.util.List;
98
import java.util.Map;
99
import java.util.Set;
100
import java.util.concurrent.Callable;
101
import java.util.concurrent.CountDownLatch;
102
import java.util.concurrent.ExecutionException;
103
import java.util.concurrent.Executor;
104
import java.util.concurrent.Future;
105
import java.util.concurrent.ScheduledExecutorService;
106
import java.util.concurrent.ScheduledFuture;
107
import java.util.concurrent.TimeUnit;
108
import java.util.concurrent.TimeoutException;
109
import java.util.concurrent.atomic.AtomicBoolean;
110
import java.util.concurrent.atomic.AtomicReference;
111
import java.util.logging.Level;
112
import java.util.logging.Logger;
113
import java.util.regex.Pattern;
114
import javax.annotation.Nullable;
115
import javax.annotation.concurrent.GuardedBy;
116
import javax.annotation.concurrent.ThreadSafe;
117

118
/** A communication channel for making outgoing RPCs. */
119
@ThreadSafe
120
final class ManagedChannelImpl extends ManagedChannel implements
121
    InternalInstrumented<ChannelStats> {
122
  @VisibleForTesting
123
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
124

125
  // Matching this pattern means the target string is a URI target or at least intended to be one.
126
  // A URI target must be an absolute hierarchical URI.
127
  // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
128
  @VisibleForTesting
129
  static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
1✔
130

131
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
132

133
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
134

135
  @VisibleForTesting
136
  static final Status SHUTDOWN_NOW_STATUS =
1✔
137
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
138

139
  @VisibleForTesting
140
  static final Status SHUTDOWN_STATUS =
1✔
141
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
142

143
  @VisibleForTesting
144
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
145
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
146

147
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
148
      ManagedChannelServiceConfig.empty();
1✔
149
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
150
      new InternalConfigSelector() {
1✔
151
        @Override
152
        public Result selectConfig(PickSubchannelArgs args) {
153
          throw new IllegalStateException("Resolution is pending");
×
154
        }
155
      };
156

157
  private final InternalLogId logId;
158
  private final String target;
159
  @Nullable
160
  private final String authorityOverride;
161
  private final NameResolverRegistry nameResolverRegistry;
162
  private final NameResolver.Factory nameResolverFactory;
163
  private final NameResolver.Args nameResolverArgs;
164
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
165
  private final ClientTransportFactory originalTransportFactory;
166
  @Nullable
167
  private final ChannelCredentials originalChannelCreds;
168
  private final ClientTransportFactory transportFactory;
169
  private final ClientTransportFactory oobTransportFactory;
170
  private final RestrictedScheduledExecutor scheduledExecutor;
171
  private final Executor executor;
172
  private final ObjectPool<? extends Executor> executorPool;
173
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
174
  private final ExecutorHolder balancerRpcExecutorHolder;
175
  private final ExecutorHolder offloadExecutorHolder;
176
  private final TimeProvider timeProvider;
177
  private final int maxTraceEvents;
178

179
  @VisibleForTesting
1✔
180
  final SynchronizationContext syncContext = new SynchronizationContext(
181
      new Thread.UncaughtExceptionHandler() {
1✔
182
        @Override
183
        public void uncaughtException(Thread t, Throwable e) {
184
          logger.log(
1✔
185
              Level.SEVERE,
186
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
187
              e);
188
          panic(e);
1✔
189
        }
1✔
190
      });
191

192
  private boolean fullStreamDecompression;
193

194
  private final DecompressorRegistry decompressorRegistry;
195
  private final CompressorRegistry compressorRegistry;
196

197
  private final Supplier<Stopwatch> stopwatchSupplier;
198
  /** The timout before entering idle mode. */
199
  private final long idleTimeoutMillis;
200

201
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
202
  private final BackoffPolicy.Provider backoffPolicyProvider;
203

204
  /**
205
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
206
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
207
   * {@link RealChannel}.
208
   */
209
  private final Channel interceptorChannel;
210
  @Nullable private final String userAgent;
211

212
  // Only null after channel is terminated. Must be assigned from the syncContext.
213
  private NameResolver nameResolver;
214

215
  // Must be accessed from the syncContext.
216
  private boolean nameResolverStarted;
217

218
  // null when channel is in idle mode.  Must be assigned from syncContext.
219
  @Nullable
220
  private LbHelperImpl lbHelper;
221

222
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
223
  // null if channel is in idle mode.
224
  @Nullable
225
  private volatile SubchannelPicker subchannelPicker;
226

227
  // Must be accessed from the syncContext
228
  private boolean panicMode;
229

230
  // Must be mutated from syncContext
231
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
232
  // switch to a ConcurrentHashMap.
233
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
234

235
  // Must be accessed from syncContext
236
  @Nullable
237
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
238
  private final Object pendingCallsInUseObject = new Object();
1✔
239

240
  // Must be mutated from syncContext
241
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
242

243
  // reprocess() must be run from syncContext
244
  private final DelayedClientTransport delayedTransport;
245
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
246
      = new UncommittedRetriableStreamsRegistry();
247

248
  // Shutdown states.
249
  //
250
  // Channel's shutdown process:
251
  // 1. shutdown(): stop accepting new calls from applications
252
  //   1a shutdown <- true
253
  //   1b subchannelPicker <- null
254
  //   1c delayedTransport.shutdown()
255
  // 2. delayedTransport terminated: stop stream-creation functionality
256
  //   2a terminating <- true
257
  //   2b loadBalancer.shutdown()
258
  //     * LoadBalancer will shutdown subchannels and OOB channels
259
  //   2c loadBalancer <- null
260
  //   2d nameResolver.shutdown()
261
  //   2e nameResolver <- null
262
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
263

264
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
265
  // Must only be mutated and read from syncContext
266
  private boolean shutdownNowed;
267
  // Must only be mutated from syncContext
268
  private boolean terminating;
269
  // Must be mutated from syncContext
270
  private volatile boolean terminated;
271
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
272

273
  private final CallTracer.Factory callTracerFactory;
274
  private final CallTracer channelCallTracer;
275
  private final ChannelTracer channelTracer;
276
  private final ChannelLogger channelLogger;
277
  private final InternalChannelz channelz;
278
  private final RealChannel realChannel;
279
  // Must be mutated and read from syncContext
280
  // a flag for doing channel tracing when flipped
281
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
282
  // Must be mutated and read from constructor or syncContext
283
  // used for channel tracing when value changed
284
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
285

286
  @Nullable
287
  private final ManagedChannelServiceConfig defaultServiceConfig;
288
  // Must be mutated and read from constructor or syncContext
289
  private boolean serviceConfigUpdated = false;
1✔
290
  private final boolean lookUpServiceConfig;
291

292
  // One instance per channel.
293
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
294

295
  private final long perRpcBufferLimit;
296
  private final long channelBufferLimit;
297

298
  // Temporary false flag that can skip the retry code path.
299
  private final boolean retryEnabled;
300

301
  // Called from syncContext
302
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
303
      new DelayedTransportListener();
304

305
  // Must be called from syncContext
306
  private void maybeShutdownNowSubchannels() {
307
    if (shutdownNowed) {
1✔
308
      for (InternalSubchannel subchannel : subchannels) {
1✔
309
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
310
      }
1✔
311
      for (OobChannel oobChannel : oobChannels) {
1✔
312
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
313
      }
1✔
314
    }
315
  }
1✔
316

317
  // Must be accessed from syncContext
318
  @VisibleForTesting
1✔
319
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
320

321
  @Override
322
  public ListenableFuture<ChannelStats> getStats() {
323
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
324
    final class StatsFetcher implements Runnable {
1✔
325
      @Override
326
      public void run() {
327
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
328
        channelCallTracer.updateBuilder(builder);
1✔
329
        channelTracer.updateBuilder(builder);
1✔
330
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
331
        List<InternalWithLogId> children = new ArrayList<>();
1✔
332
        children.addAll(subchannels);
1✔
333
        children.addAll(oobChannels);
1✔
334
        builder.setSubchannels(children);
1✔
335
        ret.set(builder.build());
1✔
336
      }
1✔
337
    }
338

339
    // subchannels and oobchannels can only be accessed from syncContext
340
    syncContext.execute(new StatsFetcher());
1✔
341
    return ret;
1✔
342
  }
343

344
  @Override
345
  public InternalLogId getLogId() {
346
    return logId;
1✔
347
  }
348

349
  // Run from syncContext
350
  private class IdleModeTimer implements Runnable {
1✔
351

352
    @Override
353
    public void run() {
354
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
355
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
356
      // subtle to change rapidly to resolve the channel panic. See #8714
357
      if (lbHelper == null) {
1✔
358
        return;
×
359
      }
360
      enterIdleMode();
1✔
361
    }
1✔
362
  }
363

364
  // Must be called from syncContext
365
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
366
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
367
    if (channelIsActive) {
1✔
368
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
369
      checkState(lbHelper != null, "lbHelper is null");
1✔
370
    }
371
    if (nameResolver != null) {
1✔
372
      nameResolver.shutdown();
1✔
373
      nameResolverStarted = false;
1✔
374
      if (channelIsActive) {
1✔
375
        nameResolver = getNameResolver(
1✔
376
            target, authorityOverride, nameResolverFactory, nameResolverArgs);
377
      } else {
378
        nameResolver = null;
1✔
379
      }
380
    }
381
    if (lbHelper != null) {
1✔
382
      lbHelper.lb.shutdown();
1✔
383
      lbHelper = null;
1✔
384
    }
385
    subchannelPicker = null;
1✔
386
  }
1✔
387

388
  /**
389
   * Make the channel exit idle mode, if it's in it.
390
   *
391
   * <p>Must be called from syncContext
392
   */
393
  @VisibleForTesting
394
  void exitIdleMode() {
395
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
396
    if (shutdown.get() || panicMode) {
1✔
397
      return;
1✔
398
    }
399
    if (inUseStateAggregator.isInUse()) {
1✔
400
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
401
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
402
      cancelIdleTimer(false);
1✔
403
    } else {
404
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
405
      // isInUse() == false, in which case we still need to schedule the timer.
406
      rescheduleIdleTimer();
1✔
407
    }
408
    if (lbHelper != null) {
1✔
409
      return;
1✔
410
    }
411
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
412
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
413
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
414
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
415
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
416
    this.lbHelper = lbHelper;
1✔
417

418
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
419
    nameResolver.start(listener);
1✔
420
    nameResolverStarted = true;
1✔
421
  }
1✔
422

423
  // Must be run from syncContext
424
  private void enterIdleMode() {
425
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
426
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
427
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
428
    // which are bugs.
429
    shutdownNameResolverAndLoadBalancer(true);
1✔
430
    delayedTransport.reprocess(null);
1✔
431
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
432
    channelStateManager.gotoState(IDLE);
1✔
433
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
434
    // transport to be holding some we need to exit idle mode to give these calls a chance to
435
    // be processed.
436
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
437
      exitIdleMode();
1✔
438
    }
439
  }
1✔
440

441
  // Must be run from syncContext
442
  private void cancelIdleTimer(boolean permanent) {
443
    idleTimer.cancel(permanent);
1✔
444
  }
1✔
445

446
  // Always run from syncContext
447
  private void rescheduleIdleTimer() {
448
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
449
      return;
1✔
450
    }
451
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
452
  }
1✔
453

454
  /**
455
   * Force name resolution refresh to happen immediately. Must be run
456
   * from syncContext.
457
   */
458
  private void refreshNameResolution() {
459
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
460
    if (nameResolverStarted) {
1✔
461
      nameResolver.refresh();
1✔
462
    }
463
  }
1✔
464

465
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
466
    volatile Throttle throttle;
467

468
    private ClientTransport getTransport(PickSubchannelArgs args) {
469
      SubchannelPicker pickerCopy = subchannelPicker;
1✔
470
      if (shutdown.get()) {
1✔
471
        // If channel is shut down, delayedTransport is also shut down which will fail the stream
472
        // properly.
473
        return delayedTransport;
1✔
474
      }
475
      if (pickerCopy == null) {
1✔
476
        final class ExitIdleModeForTransport implements Runnable {
1✔
477
          @Override
478
          public void run() {
479
            exitIdleMode();
1✔
480
          }
1✔
481
        }
482

483
        syncContext.execute(new ExitIdleModeForTransport());
1✔
484
        return delayedTransport;
1✔
485
      }
486
      // There is no need to reschedule the idle timer here.
487
      //
488
      // pickerCopy != null, which means idle timer has not expired when this method starts.
489
      // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
490
      // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
491
      // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
492
      //
493
      // In most cases the idle timer is scheduled to fire after the transport has created the
494
      // stream, which would have reported in-use state to the channel that would have cancelled
495
      // the idle timer.
496
      PickResult pickResult = pickerCopy.pickSubchannel(args);
1✔
497
      ClientTransport transport = GrpcUtil.getTransportFromPickResult(
1✔
498
          pickResult, args.getCallOptions().isWaitForReady());
1✔
499
      if (transport != null) {
1✔
500
        return transport;
1✔
501
      }
502
      return delayedTransport;
1✔
503
    }
504

505
    @Override
506
    public ClientStream newStream(
507
        final MethodDescriptor<?, ?> method,
508
        final CallOptions callOptions,
509
        final Metadata headers,
510
        final Context context) {
511
      if (!retryEnabled) {
1✔
512
        ClientTransport transport =
1✔
513
            getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
1✔
514
        Context origContext = context.attach();
1✔
515
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
516
            callOptions, headers, 0, /* isTransparentRetry= */ false);
517
        try {
518
          return transport.newStream(method, headers, callOptions, tracers);
1✔
519
        } finally {
520
          context.detach(origContext);
1✔
521
        }
522
      } else {
523
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
524
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
525
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
526
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
527
          @SuppressWarnings("unchecked")
528
          RetryStream() {
1✔
529
            super(
1✔
530
                (MethodDescriptor<ReqT, ?>) method,
531
                headers,
532
                channelBufferUsed,
1✔
533
                perRpcBufferLimit,
1✔
534
                channelBufferLimit,
1✔
535
                getCallExecutor(callOptions),
1✔
536
                transportFactory.getScheduledExecutorService(),
1✔
537
                retryPolicy,
538
                hedgingPolicy,
539
                throttle);
540
          }
1✔
541

542
          @Override
543
          Status prestart() {
544
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
545
          }
546

547
          @Override
548
          void postCommit() {
549
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
550
          }
1✔
551

552
          @Override
553
          ClientStream newSubstream(
554
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
555
              boolean isTransparentRetry) {
556
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
557
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
558
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
559
            ClientTransport transport =
1✔
560
                getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
1✔
561
            Context origContext = context.attach();
1✔
562
            try {
563
              return transport.newStream(method, newHeaders, newOptions, tracers);
1✔
564
            } finally {
565
              context.detach(origContext);
1✔
566
            }
567
          }
568
        }
569

570
        return new RetryStream<>();
1✔
571
      }
572
    }
573
  }
574

575
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
576

577
  private final Rescheduler idleTimer;
578

579
  ManagedChannelImpl(
580
      ManagedChannelImplBuilder builder,
581
      ClientTransportFactory clientTransportFactory,
582
      BackoffPolicy.Provider backoffPolicyProvider,
583
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
584
      Supplier<Stopwatch> stopwatchSupplier,
585
      List<ClientInterceptor> interceptors,
586
      final TimeProvider timeProvider) {
1✔
587
    this.target = checkNotNull(builder.target, "target");
1✔
588
    this.logId = InternalLogId.allocate("Channel", target);
1✔
589
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
590
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
591
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
592
    this.originalChannelCreds = builder.channelCredentials;
1✔
593
    this.originalTransportFactory = clientTransportFactory;
1✔
594
    this.offloadExecutorHolder =
1✔
595
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
596
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
597
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
598
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
599
        clientTransportFactory, null, this.offloadExecutorHolder);
600
    this.scheduledExecutor =
1✔
601
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
602
    maxTraceEvents = builder.maxTraceEvents;
1✔
603
    channelTracer = new ChannelTracer(
1✔
604
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
605
        "Channel for '" + target + "'");
606
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
607
    ProxyDetector proxyDetector =
608
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
609
    this.retryEnabled = builder.retryEnabled;
1✔
610
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
611
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
612
    ScParser serviceConfigParser =
1✔
613
        new ScParser(
614
            retryEnabled,
615
            builder.maxRetryAttempts,
616
            builder.maxHedgedAttempts,
617
            loadBalancerFactory);
618
    this.authorityOverride = builder.authorityOverride;
1✔
619
    this.nameResolverArgs =
1✔
620
        NameResolver.Args.newBuilder()
1✔
621
            .setDefaultPort(builder.getDefaultPort())
1✔
622
            .setProxyDetector(proxyDetector)
1✔
623
            .setSynchronizationContext(syncContext)
1✔
624
            .setScheduledExecutorService(scheduledExecutor)
1✔
625
            .setServiceConfigParser(serviceConfigParser)
1✔
626
            .setChannelLogger(channelLogger)
1✔
627
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
628
            .setOverrideAuthority(this.authorityOverride)
1✔
629
            .build();
1✔
630
    this.nameResolverFactory = builder.nameResolverFactory;
1✔
631
    this.nameResolver = getNameResolver(
1✔
632
        target, authorityOverride, nameResolverFactory, nameResolverArgs);
633
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
634
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
635
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
636
    this.delayedTransport.start(delayedTransportListener);
1✔
637
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
638

639
    if (builder.defaultServiceConfig != null) {
1✔
640
      ConfigOrError parsedDefaultServiceConfig =
1✔
641
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
642
      checkState(
1✔
643
          parsedDefaultServiceConfig.getError() == null,
1✔
644
          "Default config is invalid: %s",
645
          parsedDefaultServiceConfig.getError());
1✔
646
      this.defaultServiceConfig =
1✔
647
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
648
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
649
    } else {
1✔
650
      this.defaultServiceConfig = null;
1✔
651
    }
652
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
653
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
654
    Channel channel = realChannel;
1✔
655
    if (builder.binlog != null) {
1✔
656
      channel = builder.binlog.wrapChannel(channel);
1✔
657
    }
658
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
659
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
660
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
661
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
662
    } else {
663
      checkArgument(
1✔
664
          builder.idleTimeoutMillis
665
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
666
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
667
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
668
    }
669

670
    idleTimer = new Rescheduler(
1✔
671
        new IdleModeTimer(),
672
        syncContext,
673
        transportFactory.getScheduledExecutorService(),
1✔
674
        stopwatchSupplier.get());
1✔
675
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
676
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
677
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
678
    this.userAgent = builder.userAgent;
1✔
679

680
    this.channelBufferLimit = builder.retryBufferSize;
1✔
681
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
682
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
683
      @Override
684
      public CallTracer create() {
685
        return new CallTracer(timeProvider);
1✔
686
      }
687
    }
688

689
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
690
    channelCallTracer = callTracerFactory.create();
1✔
691
    this.channelz = checkNotNull(builder.channelz);
1✔
692
    channelz.addRootChannel(this);
1✔
693

694
    if (!lookUpServiceConfig) {
1✔
695
      if (defaultServiceConfig != null) {
1✔
696
        channelLogger.log(
1✔
697
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
698
      }
699
      serviceConfigUpdated = true;
1✔
700
    }
701
  }
1✔
702

703
  private static NameResolver getNameResolver(
704
      String target, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) {
705
    // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
706
    // "dns:///".
707
    URI targetUri = null;
1✔
708
    StringBuilder uriSyntaxErrors = new StringBuilder();
1✔
709
    try {
710
      targetUri = new URI(target);
1✔
711
      // For "localhost:8080" this would likely cause newNameResolver to return null, because
712
      // "localhost" is parsed as the scheme. Will fall into the next branch and try
713
      // "dns:///localhost:8080".
714
    } catch (URISyntaxException e) {
1✔
715
      // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
716
      uriSyntaxErrors.append(e.getMessage());
1✔
717
    }
1✔
718
    if (targetUri != null) {
1✔
719
      NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
1✔
720
      if (resolver != null) {
1✔
721
        return resolver;
1✔
722
      }
723
      // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an
724
      // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080"
725
    }
726

727
    // If we reached here, the targetUri couldn't be used.
728
    if (!URI_PATTERN.matcher(target).matches()) {
1✔
729
      // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
730
      // scheme from the factory.
731
      try {
732
        targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
1✔
733
      } catch (URISyntaxException e) {
×
734
        // Should not be possible.
735
        throw new IllegalArgumentException(e);
×
736
      }
1✔
737
      NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
1✔
738
      if (resolver != null) {
1✔
739
        return resolver;
1✔
740
      }
741
    }
742
    throw new IllegalArgumentException(String.format(
1✔
743
        "cannot find a NameResolver for %s%s",
744
        target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
1✔
745
  }
746

747
  @VisibleForTesting
748
  static NameResolver getNameResolver(
749
      String target, @Nullable final String overrideAuthority,
750
      NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) {
751
    NameResolver resolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
1✔
752

753
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
754
    // TODO: After a transition period, all NameResolver implementations that need retry should use
755
    //       RetryingNameResolver directly and this step can be removed.
756
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
757
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
758
              nameResolverArgs.getScheduledExecutorService(),
1✔
759
              nameResolverArgs.getSynchronizationContext()),
1✔
760
          nameResolverArgs.getSynchronizationContext());
1✔
761

762
    if (overrideAuthority == null) {
1✔
763
      return usedNameResolver;
1✔
764
    }
765

766
    return new ForwardingNameResolver(usedNameResolver) {
1✔
767
      @Override
768
      public String getServiceAuthority() {
769
        return overrideAuthority;
1✔
770
      }
771
    };
772
  }
773

774
  @VisibleForTesting
775
  InternalConfigSelector getConfigSelector() {
776
    return realChannel.configSelector.get();
1✔
777
  }
778

779
  /**
780
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
781
   * cancelled.
782
   */
783
  @Override
784
  public ManagedChannelImpl shutdown() {
785
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
786
    if (!shutdown.compareAndSet(false, true)) {
1✔
787
      return this;
1✔
788
    }
789
    final class Shutdown implements Runnable {
1✔
790
      @Override
791
      public void run() {
792
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
793
        channelStateManager.gotoState(SHUTDOWN);
1✔
794
      }
1✔
795
    }
796

797
    syncContext.execute(new Shutdown());
1✔
798
    realChannel.shutdown();
1✔
799
    final class CancelIdleTimer implements Runnable {
1✔
800
      @Override
801
      public void run() {
802
        cancelIdleTimer(/* permanent= */ true);
1✔
803
      }
1✔
804
    }
805

806
    syncContext.execute(new CancelIdleTimer());
1✔
807
    return this;
1✔
808
  }
809

810
  /**
811
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
812
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
813
   * return {@code false} immediately after this method returns.
814
   */
815
  @Override
816
  public ManagedChannelImpl shutdownNow() {
817
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
818
    shutdown();
1✔
819
    realChannel.shutdownNow();
1✔
820
    final class ShutdownNow implements Runnable {
1✔
821
      @Override
822
      public void run() {
823
        if (shutdownNowed) {
1✔
824
          return;
1✔
825
        }
826
        shutdownNowed = true;
1✔
827
        maybeShutdownNowSubchannels();
1✔
828
      }
1✔
829
    }
830

831
    syncContext.execute(new ShutdownNow());
1✔
832
    return this;
1✔
833
  }
834

835
  // Called from syncContext
836
  @VisibleForTesting
837
  void panic(final Throwable t) {
838
    if (panicMode) {
1✔
839
      // Preserve the first panic information
840
      return;
×
841
    }
842
    panicMode = true;
1✔
843
    cancelIdleTimer(/* permanent= */ true);
1✔
844
    shutdownNameResolverAndLoadBalancer(false);
1✔
845
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
846
      private final PickResult panicPickResult =
1✔
847
          PickResult.withDrop(
1✔
848
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
849

850
      @Override
851
      public PickResult pickSubchannel(PickSubchannelArgs args) {
852
        return panicPickResult;
1✔
853
      }
854

855
      @Override
856
      public String toString() {
857
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
858
            .add("panicPickResult", panicPickResult)
×
859
            .toString();
×
860
      }
861
    }
862

863
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
864
    realChannel.updateConfigSelector(null);
1✔
865
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
866
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
867
  }
1✔
868

869
  @VisibleForTesting
870
  boolean isInPanicMode() {
871
    return panicMode;
1✔
872
  }
873

874
  // Called from syncContext
875
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
876
    subchannelPicker = newPicker;
1✔
877
    delayedTransport.reprocess(newPicker);
1✔
878
  }
1✔
879

880
  @Override
881
  public boolean isShutdown() {
882
    return shutdown.get();
1✔
883
  }
884

885
  @Override
886
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
887
    return terminatedLatch.await(timeout, unit);
1✔
888
  }
889

890
  @Override
891
  public boolean isTerminated() {
892
    return terminated;
1✔
893
  }
894

895
  /*
896
   * Creates a new outgoing call on the channel.
897
   */
898
  @Override
899
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
900
      CallOptions callOptions) {
901
    return interceptorChannel.newCall(method, callOptions);
1✔
902
  }
903

904
  @Override
905
  public String authority() {
906
    return interceptorChannel.authority();
1✔
907
  }
908

909
  private Executor getCallExecutor(CallOptions callOptions) {
910
    Executor executor = callOptions.getExecutor();
1✔
911
    if (executor == null) {
1✔
912
      executor = this.executor;
1✔
913
    }
914
    return executor;
1✔
915
  }
916

917
  private class RealChannel extends Channel {
918
    // Reference to null if no config selector is available from resolution result
919
    // Reference must be set() from syncContext
920
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
921
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
922
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
923
    // same target, the new instance must have the same value.
924
    private final String authority;
925

926
    private final Channel clientCallImplChannel = new Channel() {
1✔
927
      @Override
928
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
929
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
930
        return new ClientCallImpl<>(
1✔
931
            method,
932
            getCallExecutor(callOptions),
1✔
933
            callOptions,
934
            transportProvider,
1✔
935
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
936
            channelCallTracer,
1✔
937
            null)
938
            .setFullStreamDecompression(fullStreamDecompression)
1✔
939
            .setDecompressorRegistry(decompressorRegistry)
1✔
940
            .setCompressorRegistry(compressorRegistry);
1✔
941
      }
942

943
      @Override
944
      public String authority() {
945
        return authority;
×
946
      }
947
    };
948

949
    private RealChannel(String authority) {
1✔
950
      this.authority =  checkNotNull(authority, "authority");
1✔
951
    }
1✔
952

953
    @Override
954
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
955
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
956
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
957
        return newClientCall(method, callOptions);
1✔
958
      }
959
      syncContext.execute(new Runnable() {
1✔
960
        @Override
961
        public void run() {
962
          exitIdleMode();
1✔
963
        }
1✔
964
      });
965
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
966
        // This is an optimization for the case (typically with InProcessTransport) when name
967
        // resolution result is immediately available at this point. Otherwise, some users'
968
        // tests might observe slight behavior difference from earlier grpc versions.
969
        return newClientCall(method, callOptions);
1✔
970
      }
971
      if (shutdown.get()) {
1✔
972
        // Return a failing ClientCall.
973
        return new ClientCall<ReqT, RespT>() {
×
974
          @Override
975
          public void start(Listener<RespT> responseListener, Metadata headers) {
976
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
977
          }
×
978

979
          @Override public void request(int numMessages) {}
×
980

981
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
982

983
          @Override public void halfClose() {}
×
984

985
          @Override public void sendMessage(ReqT message) {}
×
986
        };
987
      }
988
      Context context = Context.current();
1✔
989
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
990
      syncContext.execute(new Runnable() {
1✔
991
        @Override
992
        public void run() {
993
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
994
            if (pendingCalls == null) {
1✔
995
              pendingCalls = new LinkedHashSet<>();
1✔
996
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
997
            }
998
            pendingCalls.add(pendingCall);
1✔
999
          } else {
1000
            pendingCall.reprocess();
1✔
1001
          }
1002
        }
1✔
1003
      });
1004
      return pendingCall;
1✔
1005
    }
1006

1007
    // Must run in SynchronizationContext.
1008
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
1009
      InternalConfigSelector prevConfig = configSelector.get();
1✔
1010
      configSelector.set(config);
1✔
1011
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
1012
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1013
          pendingCall.reprocess();
1✔
1014
        }
1✔
1015
      }
1016
    }
1✔
1017

1018
    // Must run in SynchronizationContext.
1019
    void onConfigError() {
1020
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1021
        updateConfigSelector(null);
1✔
1022
      }
1023
    }
1✔
1024

1025
    void shutdown() {
1026
      final class RealChannelShutdown implements Runnable {
1✔
1027
        @Override
1028
        public void run() {
1029
          if (pendingCalls == null) {
1✔
1030
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1031
              configSelector.set(null);
1✔
1032
            }
1033
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1034
          }
1035
        }
1✔
1036
      }
1037

1038
      syncContext.execute(new RealChannelShutdown());
1✔
1039
    }
1✔
1040

1041
    void shutdownNow() {
1042
      final class RealChannelShutdownNow implements Runnable {
1✔
1043
        @Override
1044
        public void run() {
1045
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1046
            configSelector.set(null);
1✔
1047
          }
1048
          if (pendingCalls != null) {
1✔
1049
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1050
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
1051
            }
1✔
1052
          }
1053
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
1054
        }
1✔
1055
      }
1056

1057
      syncContext.execute(new RealChannelShutdownNow());
1✔
1058
    }
1✔
1059

1060
    @Override
1061
    public String authority() {
1062
      return authority;
1✔
1063
    }
1064

1065
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1066
      final Context context;
1067
      final MethodDescriptor<ReqT, RespT> method;
1068
      final CallOptions callOptions;
1069

1070
      PendingCall(
1071
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1072
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1073
        this.context = context;
1✔
1074
        this.method = method;
1✔
1075
        this.callOptions = callOptions;
1✔
1076
      }
1✔
1077

1078
      /** Called when it's ready to create a real call and reprocess the pending call. */
1079
      void reprocess() {
1080
        ClientCall<ReqT, RespT> realCall;
1081
        Context previous = context.attach();
1✔
1082
        try {
1083
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED, true);
1✔
1084
          realCall = newClientCall(method, delayResolutionOption);
1✔
1085
        } finally {
1086
          context.detach(previous);
1✔
1087
        }
1088
        Runnable toRun = setCall(realCall);
1✔
1089
        if (toRun == null) {
1✔
1090
          syncContext.execute(new PendingCallRemoval());
1✔
1091
        } else {
1092
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1093
            @Override
1094
            public void run() {
1095
              toRun.run();
1✔
1096
              syncContext.execute(new PendingCallRemoval());
1✔
1097
            }
1✔
1098
          });
1099
        }
1100
      }
1✔
1101

1102
      @Override
1103
      protected void callCancelled() {
1104
        super.callCancelled();
1✔
1105
        syncContext.execute(new PendingCallRemoval());
1✔
1106
      }
1✔
1107

1108
      final class PendingCallRemoval implements Runnable {
1✔
1109
        @Override
1110
        public void run() {
1111
          if (pendingCalls != null) {
1✔
1112
            pendingCalls.remove(PendingCall.this);
1✔
1113
            if (pendingCalls.isEmpty()) {
1✔
1114
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1115
              pendingCalls = null;
1✔
1116
              if (shutdown.get()) {
1✔
1117
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1118
              }
1119
            }
1120
          }
1121
        }
1✔
1122
      }
1123
    }
1124

1125
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1126
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1127
      InternalConfigSelector selector = configSelector.get();
1✔
1128
      if (selector == null) {
1✔
1129
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1130
      }
1131
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1132
        MethodInfo methodInfo =
1✔
1133
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1134
        if (methodInfo != null) {
1✔
1135
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1136
        }
1137
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1138
      }
1139
      return new ConfigSelectingClientCall<>(
1✔
1140
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1141
    }
1142
  }
1143

1144
  /**
1145
   * A client call for a given channel that applies a given config selector when it starts.
1146
   */
1147
  static final class ConfigSelectingClientCall<ReqT, RespT>
1148
      extends ForwardingClientCall<ReqT, RespT> {
1149

1150
    private final InternalConfigSelector configSelector;
1151
    private final Channel channel;
1152
    private final Executor callExecutor;
1153
    private final MethodDescriptor<ReqT, RespT> method;
1154
    private final Context context;
1155
    private CallOptions callOptions;
1156

1157
    private ClientCall<ReqT, RespT> delegate;
1158

1159
    ConfigSelectingClientCall(
1160
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1161
        MethodDescriptor<ReqT, RespT> method,
1162
        CallOptions callOptions) {
1✔
1163
      this.configSelector = configSelector;
1✔
1164
      this.channel = channel;
1✔
1165
      this.method = method;
1✔
1166
      this.callExecutor =
1✔
1167
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1168
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1169
      this.context = Context.current();
1✔
1170
    }
1✔
1171

1172
    @Override
1173
    protected ClientCall<ReqT, RespT> delegate() {
1174
      return delegate;
1✔
1175
    }
1176

1177
    @SuppressWarnings("unchecked")
1178
    @Override
1179
    public void start(Listener<RespT> observer, Metadata headers) {
1180
      PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
1✔
1181
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1182
      Status status = result.getStatus();
1✔
1183
      if (!status.isOk()) {
1✔
1184
        executeCloseObserverInContext(observer,
1✔
1185
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1186
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1187
        return;
1✔
1188
      }
1189
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1190
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1191
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1192
      if (methodInfo != null) {
1✔
1193
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1194
      }
1195
      if (interceptor != null) {
1✔
1196
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1197
      } else {
1198
        delegate = channel.newCall(method, callOptions);
×
1199
      }
1200
      delegate.start(observer, headers);
1✔
1201
    }
1✔
1202

1203
    private void executeCloseObserverInContext(
1204
        final Listener<RespT> observer, final Status status) {
1205
      class CloseInContext extends ContextRunnable {
1206
        CloseInContext() {
1✔
1207
          super(context);
1✔
1208
        }
1✔
1209

1210
        @Override
1211
        public void runInContext() {
1212
          observer.onClose(status, new Metadata());
1✔
1213
        }
1✔
1214
      }
1215

1216
      callExecutor.execute(new CloseInContext());
1✔
1217
    }
1✔
1218

1219
    @Override
1220
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1221
      if (delegate != null) {
×
1222
        delegate.cancel(message, cause);
×
1223
      }
1224
    }
×
1225
  }
1226

1227
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1228
    @Override
1229
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1230

1231
    @Override
1232
    public void request(int numMessages) {}
1✔
1233

1234
    @Override
1235
    public void cancel(String message, Throwable cause) {}
×
1236

1237
    @Override
1238
    public void halfClose() {}
×
1239

1240
    @Override
1241
    public void sendMessage(Object message) {}
×
1242

1243
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1244
    @Override
1245
    public boolean isReady() {
1246
      return false;
×
1247
    }
1248
  };
1249

1250
  /**
1251
   * Terminate the channel if termination conditions are met.
1252
   */
1253
  // Must be run from syncContext
1254
  private void maybeTerminateChannel() {
1255
    if (terminated) {
1✔
1256
      return;
×
1257
    }
1258
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1259
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1260
      channelz.removeRootChannel(this);
1✔
1261
      executorPool.returnObject(executor);
1✔
1262
      balancerRpcExecutorHolder.release();
1✔
1263
      offloadExecutorHolder.release();
1✔
1264
      // Release the transport factory so that it can deallocate any resources.
1265
      transportFactory.close();
1✔
1266

1267
      terminated = true;
1✔
1268
      terminatedLatch.countDown();
1✔
1269
    }
1270
  }
1✔
1271

1272
  // Must be called from syncContext
1273
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1274
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1275
      refreshNameResolution();
1✔
1276
    }
1277
  }
1✔
1278

1279
  @Override
1280
  @SuppressWarnings("deprecation")
1281
  public ConnectivityState getState(boolean requestConnection) {
1282
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1283
    if (requestConnection && savedChannelState == IDLE) {
1✔
1284
      final class RequestConnection implements Runnable {
1✔
1285
        @Override
1286
        public void run() {
1287
          exitIdleMode();
1✔
1288
          if (subchannelPicker != null) {
1✔
1289
            subchannelPicker.requestConnection();
1✔
1290
          }
1291
          if (lbHelper != null) {
1✔
1292
            lbHelper.lb.requestConnection();
1✔
1293
          }
1294
        }
1✔
1295
      }
1296

1297
      syncContext.execute(new RequestConnection());
1✔
1298
    }
1299
    return savedChannelState;
1✔
1300
  }
1301

1302
  @Override
1303
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1304
    final class NotifyStateChanged implements Runnable {
1✔
1305
      @Override
1306
      public void run() {
1307
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1308
      }
1✔
1309
    }
1310

1311
    syncContext.execute(new NotifyStateChanged());
1✔
1312
  }
1✔
1313

1314
  @Override
1315
  public void resetConnectBackoff() {
1316
    final class ResetConnectBackoff implements Runnable {
1✔
1317
      @Override
1318
      public void run() {
1319
        if (shutdown.get()) {
1✔
1320
          return;
1✔
1321
        }
1322
        if (nameResolverStarted) {
1✔
1323
          refreshNameResolution();
1✔
1324
        }
1325
        for (InternalSubchannel subchannel : subchannels) {
1✔
1326
          subchannel.resetConnectBackoff();
1✔
1327
        }
1✔
1328
        for (OobChannel oobChannel : oobChannels) {
1✔
1329
          oobChannel.resetConnectBackoff();
×
1330
        }
×
1331
      }
1✔
1332
    }
1333

1334
    syncContext.execute(new ResetConnectBackoff());
1✔
1335
  }
1✔
1336

1337
  @Override
1338
  public void enterIdle() {
1339
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1340
      @Override
1341
      public void run() {
1342
        if (shutdown.get() || lbHelper == null) {
1✔
1343
          return;
1✔
1344
        }
1345
        cancelIdleTimer(/* permanent= */ false);
1✔
1346
        enterIdleMode();
1✔
1347
      }
1✔
1348
    }
1349

1350
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1351
  }
1✔
1352

1353
  /**
1354
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1355
   * backoff.
1356
   */
1357
  private final class UncommittedRetriableStreamsRegistry {
1✔
1358
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1359
    // it's worthwhile to look for a lock-free approach.
1360
    final Object lock = new Object();
1✔
1361

1362
    @GuardedBy("lock")
1✔
1363
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1364

1365
    @GuardedBy("lock")
1366
    Status shutdownStatus;
1367

1368
    void onShutdown(Status reason) {
1369
      boolean shouldShutdownDelayedTransport = false;
1✔
1370
      synchronized (lock) {
1✔
1371
        if (shutdownStatus != null) {
1✔
1372
          return;
1✔
1373
        }
1374
        shutdownStatus = reason;
1✔
1375
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1376
        // retriable streams, which may be in backoff and not using any transport, are already
1377
        // started RPCs.
1378
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1379
          shouldShutdownDelayedTransport = true;
1✔
1380
        }
1381
      }
1✔
1382

1383
      if (shouldShutdownDelayedTransport) {
1✔
1384
        delayedTransport.shutdown(reason);
1✔
1385
      }
1386
    }
1✔
1387

1388
    void onShutdownNow(Status reason) {
1389
      onShutdown(reason);
1✔
1390
      Collection<ClientStream> streams;
1391

1392
      synchronized (lock) {
1✔
1393
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1394
      }
1✔
1395

1396
      for (ClientStream stream : streams) {
1✔
1397
        stream.cancel(reason);
1✔
1398
      }
1✔
1399
      delayedTransport.shutdownNow(reason);
1✔
1400
    }
1✔
1401

1402
    /**
1403
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1404
     * shutdown Status.
1405
     */
1406
    @Nullable
1407
    Status add(RetriableStream<?> retriableStream) {
1408
      synchronized (lock) {
1✔
1409
        if (shutdownStatus != null) {
1✔
1410
          return shutdownStatus;
1✔
1411
        }
1412
        uncommittedRetriableStreams.add(retriableStream);
1✔
1413
        return null;
1✔
1414
      }
1415
    }
1416

1417
    void remove(RetriableStream<?> retriableStream) {
1418
      Status shutdownStatusCopy = null;
1✔
1419

1420
      synchronized (lock) {
1✔
1421
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1422
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1423
          shutdownStatusCopy = shutdownStatus;
1✔
1424
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1425
          // hashmap.
1426
          uncommittedRetriableStreams = new HashSet<>();
1✔
1427
        }
1428
      }
1✔
1429

1430
      if (shutdownStatusCopy != null) {
1✔
1431
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1432
      }
1433
    }
1✔
1434
  }
1435

1436
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1437
    AutoConfiguredLoadBalancer lb;
1438

1439
    @Override
1440
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1441
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1442
      // No new subchannel should be created after load balancer has been shutdown.
1443
      checkState(!terminating, "Channel is being terminated");
1✔
1444
      return new SubchannelImpl(args);
1✔
1445
    }
1446

1447
    @Override
1448
    public void updateBalancingState(
1449
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1450
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1451
      checkNotNull(newState, "newState");
1✔
1452
      checkNotNull(newPicker, "newPicker");
1✔
1453
      final class UpdateBalancingState implements Runnable {
1✔
1454
        @Override
1455
        public void run() {
1456
          if (LbHelperImpl.this != lbHelper) {
1✔
1457
            return;
1✔
1458
          }
1459
          updateSubchannelPicker(newPicker);
1✔
1460
          // It's not appropriate to report SHUTDOWN state from lb.
1461
          // Ignore the case of newState == SHUTDOWN for now.
1462
          if (newState != SHUTDOWN) {
1✔
1463
            channelLogger.log(
1✔
1464
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1465
            channelStateManager.gotoState(newState);
1✔
1466
          }
1467
        }
1✔
1468
      }
1469

1470
      syncContext.execute(new UpdateBalancingState());
1✔
1471
    }
1✔
1472

1473
    @Override
1474
    public void refreshNameResolution() {
1475
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1476
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1477
        @Override
1478
        public void run() {
1479
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1480
        }
1✔
1481
      }
1482

1483
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1484
    }
1✔
1485

1486
    @Override
1487
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1488
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1489
    }
1490

1491
    @Override
1492
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1493
        String authority) {
1494
      // TODO(ejona): can we be even stricter? Like terminating?
1495
      checkState(!terminated, "Channel is terminated");
1✔
1496
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1497
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1498
      InternalLogId subchannelLogId =
1✔
1499
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1500
      ChannelTracer oobChannelTracer =
1✔
1501
          new ChannelTracer(
1502
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1503
              "OobChannel for " + addressGroup);
1504
      final OobChannel oobChannel = new OobChannel(
1✔
1505
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1506
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1507
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1508
          .setDescription("Child OobChannel created")
1✔
1509
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1510
          .setTimestampNanos(oobChannelCreationTime)
1✔
1511
          .setChannelRef(oobChannel)
1✔
1512
          .build());
1✔
1513
      ChannelTracer subchannelTracer =
1✔
1514
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1515
              "Subchannel for " + addressGroup);
1516
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1517
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1518
        @Override
1519
        void onTerminated(InternalSubchannel is) {
1520
          oobChannels.remove(oobChannel);
1✔
1521
          channelz.removeSubchannel(is);
1✔
1522
          oobChannel.handleSubchannelTerminated();
1✔
1523
          maybeTerminateChannel();
1✔
1524
        }
1✔
1525

1526
        @Override
1527
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1528
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1529
          //  state and refresh name resolution if necessary.
1530
          handleInternalSubchannelState(newState);
1✔
1531
          oobChannel.handleSubchannelStateChange(newState);
1✔
1532
        }
1✔
1533
      }
1534

1535
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1536
          addressGroup,
1537
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1538
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1539
          // All callback methods are run from syncContext
1540
          new ManagedOobChannelCallback(),
1541
          channelz,
1✔
1542
          callTracerFactory.create(),
1✔
1543
          subchannelTracer,
1544
          subchannelLogId,
1545
          subchannelLogger);
1546
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1547
          .setDescription("Child Subchannel created")
1✔
1548
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1549
          .setTimestampNanos(oobChannelCreationTime)
1✔
1550
          .setSubchannelRef(internalSubchannel)
1✔
1551
          .build());
1✔
1552
      channelz.addSubchannel(oobChannel);
1✔
1553
      channelz.addSubchannel(internalSubchannel);
1✔
1554
      oobChannel.setSubchannel(internalSubchannel);
1✔
1555
      final class AddOobChannel implements Runnable {
1✔
1556
        @Override
1557
        public void run() {
1558
          if (terminating) {
1✔
1559
            oobChannel.shutdown();
×
1560
          }
1561
          if (!terminated) {
1✔
1562
            // If channel has not terminated, it will track the subchannel and block termination
1563
            // for it.
1564
            oobChannels.add(oobChannel);
1✔
1565
          }
1566
        }
1✔
1567
      }
1568

1569
      syncContext.execute(new AddOobChannel());
1✔
1570
      return oobChannel;
1✔
1571
    }
1572

1573
    @Deprecated
1574
    @Override
1575
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1576
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1577
          // Override authority to keep the old behavior.
1578
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1579
          .overrideAuthority(getAuthority());
1✔
1580
    }
1581

1582
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1583
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1584
    @Override
1585
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1586
        final String target, final ChannelCredentials channelCreds) {
1587
      checkNotNull(channelCreds, "channelCreds");
1✔
1588

1589
      final class ResolvingOobChannelBuilder
1590
          extends ForwardingChannelBuilder<ResolvingOobChannelBuilder> {
1591
        final ManagedChannelBuilder<?> delegate;
1592

1593
        ResolvingOobChannelBuilder() {
1✔
1594
          final ClientTransportFactory transportFactory;
1595
          CallCredentials callCredentials;
1596
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1597
            transportFactory = originalTransportFactory;
1✔
1598
            callCredentials = null;
1✔
1599
          } else {
1600
            SwapChannelCredentialsResult swapResult =
1✔
1601
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1602
            if (swapResult == null) {
1✔
1603
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1604
              return;
×
1605
            } else {
1606
              transportFactory = swapResult.transportFactory;
1✔
1607
              callCredentials = swapResult.callCredentials;
1✔
1608
            }
1609
          }
1610
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1611
              new ClientTransportFactoryBuilder() {
1✔
1612
                @Override
1613
                public ClientTransportFactory buildClientTransportFactory() {
1614
                  return transportFactory;
1✔
1615
                }
1616
              };
1617
          delegate = new ManagedChannelImplBuilder(
1✔
1618
              target,
1619
              channelCreds,
1620
              callCredentials,
1621
              transportFactoryBuilder,
1622
              new FixedPortProvider(nameResolverArgs.getDefaultPort()));
1✔
1623
        }
1✔
1624

1625
        @Override
1626
        protected ManagedChannelBuilder<?> delegate() {
1627
          return delegate;
1✔
1628
        }
1629
      }
1630

1631
      checkState(!terminated, "Channel is terminated");
1✔
1632

1633
      @SuppressWarnings("deprecation")
1634
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder()
1✔
1635
          .nameResolverFactory(nameResolverFactory);
1✔
1636

1637
      return builder
1✔
1638
          // TODO(zdapeng): executors should not outlive the parent channel.
1639
          .executor(executor)
1✔
1640
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1641
          .maxTraceEvents(maxTraceEvents)
1✔
1642
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1643
          .userAgent(userAgent);
1✔
1644
    }
1645

1646
    @Override
1647
    public ChannelCredentials getUnsafeChannelCredentials() {
1648
      if (originalChannelCreds == null) {
×
1649
        return new DefaultChannelCreds();
×
1650
      }
1651
      return originalChannelCreds;
×
1652
    }
1653

1654
    @Override
1655
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1656
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1657
    }
×
1658

1659
    @Override
1660
    public void updateOobChannelAddresses(ManagedChannel channel,
1661
        List<EquivalentAddressGroup> eag) {
1662
      checkArgument(channel instanceof OobChannel,
1✔
1663
          "channel must have been returned from createOobChannel");
1664
      ((OobChannel) channel).updateAddresses(eag);
1✔
1665
    }
1✔
1666

1667
    @Override
1668
    public String getAuthority() {
1669
      return ManagedChannelImpl.this.authority();
1✔
1670
    }
1671

1672
    @Override
1673
    public SynchronizationContext getSynchronizationContext() {
1674
      return syncContext;
1✔
1675
    }
1676

1677
    @Override
1678
    public ScheduledExecutorService getScheduledExecutorService() {
1679
      return scheduledExecutor;
1✔
1680
    }
1681

1682
    @Override
1683
    public ChannelLogger getChannelLogger() {
1684
      return channelLogger;
1✔
1685
    }
1686

1687
    @Override
1688
    public NameResolver.Args getNameResolverArgs() {
1689
      return nameResolverArgs;
1✔
1690
    }
1691

1692
    @Override
1693
    public NameResolverRegistry getNameResolverRegistry() {
1694
      return nameResolverRegistry;
1✔
1695
    }
1696

1697
    /**
1698
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1699
     */
1700
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1701
    //     channel creds.
1702
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1703
      @Override
1704
      public ChannelCredentials withoutBearerTokens() {
1705
        return this;
×
1706
      }
1707
    }
1708
  }
1709

1710
  final class NameResolverListener extends NameResolver.Listener2 {
1711
    final LbHelperImpl helper;
1712
    final NameResolver resolver;
1713

1714
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1715
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1716
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1717
    }
1✔
1718

1719
    @Override
1720
    public void onResult(final ResolutionResult resolutionResult) {
1721
      final class NamesResolved implements Runnable {
1✔
1722

1723
        @SuppressWarnings("ReferenceEquality")
1724
        @Override
1725
        public void run() {
1726
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1727
            return;
1✔
1728
          }
1729

1730
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1731
          channelLogger.log(
1✔
1732
              ChannelLogLevel.DEBUG,
1733
              "Resolved address: {0}, config={1}",
1734
              servers,
1735
              resolutionResult.getAttributes());
1✔
1736

1737
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1738
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1739
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1740
          }
1741

1742
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1743
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1744
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1745
          InternalConfigSelector resolvedConfigSelector =
1✔
1746
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1747
          ManagedChannelServiceConfig validServiceConfig =
1748
              configOrError != null && configOrError.getConfig() != null
1✔
1749
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1750
                  : null;
1✔
1751
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1752

1753
          ManagedChannelServiceConfig effectiveServiceConfig;
1754
          if (!lookUpServiceConfig) {
1✔
1755
            if (validServiceConfig != null) {
1✔
1756
              channelLogger.log(
1✔
1757
                  ChannelLogLevel.INFO,
1758
                  "Service config from name resolver discarded by channel settings");
1759
            }
1760
            effectiveServiceConfig =
1761
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1762
            if (resolvedConfigSelector != null) {
1✔
1763
              channelLogger.log(
1✔
1764
                  ChannelLogLevel.INFO,
1765
                  "Config selector from name resolver discarded by channel settings");
1766
            }
1767
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1768
          } else {
1769
            // Try to use config if returned from name resolver
1770
            // Otherwise, try to use the default config if available
1771
            if (validServiceConfig != null) {
1✔
1772
              effectiveServiceConfig = validServiceConfig;
1✔
1773
              if (resolvedConfigSelector != null) {
1✔
1774
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1775
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1776
                  channelLogger.log(
×
1777
                      ChannelLogLevel.DEBUG,
1778
                      "Method configs in service config will be discarded due to presence of"
1779
                          + "config-selector");
1780
                }
1781
              } else {
1782
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1783
              }
1784
            } else if (defaultServiceConfig != null) {
1✔
1785
              effectiveServiceConfig = defaultServiceConfig;
1✔
1786
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1787
              channelLogger.log(
1✔
1788
                  ChannelLogLevel.INFO,
1789
                  "Received no service config, using default service config");
1790
            } else if (serviceConfigError != null) {
1✔
1791
              if (!serviceConfigUpdated) {
1✔
1792
                // First DNS lookup has invalid service config, and cannot fall back to default
1793
                channelLogger.log(
1✔
1794
                    ChannelLogLevel.INFO,
1795
                    "Fallback to error due to invalid first service config without default config");
1796
                // This error could be an "inappropriate" control plane error that should not bleed
1797
                // through to client code using gRPC. We let them flow through here to the LB as
1798
                // we later check for these error codes when investigating pick results in
1799
                // GrpcUtil.getTransportFromPickResult().
1800
                onError(configOrError.getError());
1✔
1801
                if (resolutionResultListener != null) {
1✔
1802
                  resolutionResultListener.resolutionAttempted(false);
1✔
1803
                }
1804
                return;
1✔
1805
              } else {
1806
                effectiveServiceConfig = lastServiceConfig;
1✔
1807
              }
1808
            } else {
1809
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1810
              realChannel.updateConfigSelector(null);
1✔
1811
            }
1812
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1813
              channelLogger.log(
1✔
1814
                  ChannelLogLevel.INFO,
1815
                  "Service config changed{0}",
1816
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1817
              lastServiceConfig = effectiveServiceConfig;
1✔
1818
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1819
            }
1820

1821
            try {
1822
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1823
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1824
              //  lbNeedAddress is not deterministic
1825
              serviceConfigUpdated = true;
1✔
1826
            } catch (RuntimeException re) {
×
1827
              logger.log(
×
1828
                  Level.WARNING,
1829
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1830
                  re);
1831
            }
1✔
1832
          }
1833

1834
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1835
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1836
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1837
            Attributes.Builder attrBuilder =
1✔
1838
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1839
            Map<String, ?> healthCheckingConfig =
1✔
1840
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1841
            if (healthCheckingConfig != null) {
1✔
1842
              attrBuilder
1✔
1843
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1844
                  .build();
1✔
1845
            }
1846
            Attributes attributes = attrBuilder.build();
1✔
1847

1848
            boolean lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses(
1✔
1849
                ResolvedAddresses.newBuilder()
1✔
1850
                    .setAddresses(servers)
1✔
1851
                    .setAttributes(attributes)
1✔
1852
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1853
                    .build());
1✔
1854
            // If a listener is provided, let it know if the addresses were accepted.
1855
            if (resolutionResultListener != null) {
1✔
1856
              resolutionResultListener.resolutionAttempted(lastAddressesAccepted);
1✔
1857
            }
1858
          }
1859
        }
1✔
1860
      }
1861

1862
      syncContext.execute(new NamesResolved());
1✔
1863
    }
1✔
1864

1865
    @Override
1866
    public void onError(final Status error) {
1867
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1868
      final class NameResolverErrorHandler implements Runnable {
1✔
1869
        @Override
1870
        public void run() {
1871
          handleErrorInSyncContext(error);
1✔
1872
        }
1✔
1873
      }
1874

1875
      syncContext.execute(new NameResolverErrorHandler());
1✔
1876
    }
1✔
1877

1878
    private void handleErrorInSyncContext(Status error) {
1879
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1880
          new Object[] {getLogId(), error});
1✔
1881
      realChannel.onConfigError();
1✔
1882
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1883
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1884
        lastResolutionState = ResolutionState.ERROR;
1✔
1885
      }
1886
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1887
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1888
        return;
1✔
1889
      }
1890

1891
      helper.lb.handleNameResolutionError(error);
1✔
1892
    }
1✔
1893
  }
1894

1895
  private final class SubchannelImpl extends AbstractSubchannel {
1896
    final CreateSubchannelArgs args;
1897
    final InternalLogId subchannelLogId;
1898
    final ChannelLoggerImpl subchannelLogger;
1899
    final ChannelTracer subchannelTracer;
1900
    List<EquivalentAddressGroup> addressGroups;
1901
    InternalSubchannel subchannel;
1902
    boolean started;
1903
    boolean shutdown;
1904
    ScheduledHandle delayedShutdownTask;
1905

1906
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1907
      checkNotNull(args, "args");
1✔
1908
      addressGroups = args.getAddresses();
1✔
1909
      if (authorityOverride != null) {
1✔
1910
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1911
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1912
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1913
      }
1914
      this.args = args;
1✔
1915
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1916
      subchannelTracer = new ChannelTracer(
1✔
1917
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1918
          "Subchannel for " + args.getAddresses());
1✔
1919
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1920
    }
1✔
1921

1922
    @Override
1923
    public void start(final SubchannelStateListener listener) {
1924
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1925
      checkState(!started, "already started");
1✔
1926
      checkState(!shutdown, "already shutdown");
1✔
1927
      checkState(!terminating, "Channel is being terminated");
1✔
1928
      started = true;
1✔
1929
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1930
        // All callbacks are run in syncContext
1931
        @Override
1932
        void onTerminated(InternalSubchannel is) {
1933
          subchannels.remove(is);
1✔
1934
          channelz.removeSubchannel(is);
1✔
1935
          maybeTerminateChannel();
1✔
1936
        }
1✔
1937

1938
        @Override
1939
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1940
          checkState(listener != null, "listener is null");
1✔
1941
          listener.onSubchannelState(newState);
1✔
1942
        }
1✔
1943

1944
        @Override
1945
        void onInUse(InternalSubchannel is) {
1946
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1947
        }
1✔
1948

1949
        @Override
1950
        void onNotInUse(InternalSubchannel is) {
1951
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1952
        }
1✔
1953
      }
1954

1955
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1956
          args.getAddresses(),
1✔
1957
          authority(),
1✔
1958
          userAgent,
1✔
1959
          backoffPolicyProvider,
1✔
1960
          transportFactory,
1✔
1961
          transportFactory.getScheduledExecutorService(),
1✔
1962
          stopwatchSupplier,
1✔
1963
          syncContext,
1964
          new ManagedInternalSubchannelCallback(),
1965
          channelz,
1✔
1966
          callTracerFactory.create(),
1✔
1967
          subchannelTracer,
1968
          subchannelLogId,
1969
          subchannelLogger);
1970

1971
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1972
          .setDescription("Child Subchannel started")
1✔
1973
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1974
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1975
          .setSubchannelRef(internalSubchannel)
1✔
1976
          .build());
1✔
1977

1978
      this.subchannel = internalSubchannel;
1✔
1979
      channelz.addSubchannel(internalSubchannel);
1✔
1980
      subchannels.add(internalSubchannel);
1✔
1981
    }
1✔
1982

1983
    @Override
1984
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1985
      checkState(started, "not started");
1✔
1986
      return subchannel;
1✔
1987
    }
1988

1989
    @Override
1990
    public void shutdown() {
1991
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1992
      if (subchannel == null) {
1✔
1993
        // start() was not successful
1994
        shutdown = true;
×
1995
        return;
×
1996
      }
1997
      if (shutdown) {
1✔
1998
        if (terminating && delayedShutdownTask != null) {
1✔
1999
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
2000
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
2001
          delayedShutdownTask.cancel();
×
2002
          delayedShutdownTask = null;
×
2003
          // Will fall through to the subchannel.shutdown() at the end.
2004
        } else {
2005
          return;
1✔
2006
        }
2007
      } else {
2008
        shutdown = true;
1✔
2009
      }
2010
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
2011
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
2012
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
2013
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
2014
      // shutdown of Subchannel for a few seconds here.
2015
      //
2016
      // TODO(zhangkun83): consider a better approach
2017
      // (https://github.com/grpc/grpc-java/issues/2562).
2018
      if (!terminating) {
1✔
2019
        final class ShutdownSubchannel implements Runnable {
1✔
2020
          @Override
2021
          public void run() {
2022
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
2023
          }
1✔
2024
        }
2025

2026
        delayedShutdownTask = syncContext.schedule(
1✔
2027
            new LogExceptionRunnable(new ShutdownSubchannel()),
2028
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2029
            transportFactory.getScheduledExecutorService());
1✔
2030
        return;
1✔
2031
      }
2032
      // When terminating == true, no more real streams will be created. It's safe and also
2033
      // desirable to shutdown timely.
2034
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
2035
    }
1✔
2036

2037
    @Override
2038
    public void requestConnection() {
2039
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2040
      checkState(started, "not started");
1✔
2041
      subchannel.obtainActiveTransport();
1✔
2042
    }
1✔
2043

2044
    @Override
2045
    public List<EquivalentAddressGroup> getAllAddresses() {
2046
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2047
      checkState(started, "not started");
1✔
2048
      return addressGroups;
1✔
2049
    }
2050

2051
    @Override
2052
    public Attributes getAttributes() {
2053
      return args.getAttributes();
1✔
2054
    }
2055

2056
    @Override
2057
    public String toString() {
2058
      return subchannelLogId.toString();
1✔
2059
    }
2060

2061
    @Override
2062
    public Channel asChannel() {
2063
      checkState(started, "not started");
1✔
2064
      return new SubchannelChannel(
1✔
2065
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2066
          transportFactory.getScheduledExecutorService(),
1✔
2067
          callTracerFactory.create(),
1✔
2068
          new AtomicReference<InternalConfigSelector>(null));
2069
    }
2070

2071
    @Override
2072
    public Object getInternalSubchannel() {
2073
      checkState(started, "Subchannel is not started");
1✔
2074
      return subchannel;
1✔
2075
    }
2076

2077
    @Override
2078
    public ChannelLogger getChannelLogger() {
2079
      return subchannelLogger;
1✔
2080
    }
2081

2082
    @Override
2083
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2084
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2085
      addressGroups = addrs;
1✔
2086
      if (authorityOverride != null) {
1✔
2087
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2088
      }
2089
      subchannel.updateAddresses(addrs);
1✔
2090
    }
1✔
2091

2092
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2093
        List<EquivalentAddressGroup> eags) {
2094
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2095
      for (EquivalentAddressGroup eag : eags) {
1✔
2096
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2097
            eag.getAddresses(),
1✔
2098
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2099
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2100
      }
1✔
2101
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2102
    }
2103
  }
2104

2105
  @Override
2106
  public String toString() {
2107
    return MoreObjects.toStringHelper(this)
1✔
2108
        .add("logId", logId.getId())
1✔
2109
        .add("target", target)
1✔
2110
        .toString();
1✔
2111
  }
2112

2113
  /**
2114
   * Called from syncContext.
2115
   */
2116
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2117
    @Override
2118
    public void transportShutdown(Status s) {
2119
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2120
    }
1✔
2121

2122
    @Override
2123
    public void transportReady() {
2124
      // Don't care
2125
    }
×
2126

2127
    @Override
2128
    public void transportInUse(final boolean inUse) {
2129
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2130
    }
1✔
2131

2132
    @Override
2133
    public void transportTerminated() {
2134
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2135
      terminating = true;
1✔
2136
      shutdownNameResolverAndLoadBalancer(false);
1✔
2137
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2138
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2139
      // here.
2140
      maybeShutdownNowSubchannels();
1✔
2141
      maybeTerminateChannel();
1✔
2142
    }
1✔
2143
  }
2144

2145
  /**
2146
   * Must be accessed from syncContext.
2147
   */
2148
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2149
    @Override
2150
    protected void handleInUse() {
2151
      exitIdleMode();
1✔
2152
    }
1✔
2153

2154
    @Override
2155
    protected void handleNotInUse() {
2156
      if (shutdown.get()) {
1✔
2157
        return;
1✔
2158
      }
2159
      rescheduleIdleTimer();
1✔
2160
    }
1✔
2161
  }
2162

2163
  /**
2164
   * Lazily request for Executor from an executor pool.
2165
   * Also act as an Executor directly to simply run a cmd
2166
   */
2167
  @VisibleForTesting
2168
  static final class ExecutorHolder implements Executor {
2169
    private final ObjectPool<? extends Executor> pool;
2170
    private Executor executor;
2171

2172
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2173
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2174
    }
1✔
2175

2176
    synchronized Executor getExecutor() {
2177
      if (executor == null) {
1✔
2178
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2179
      }
2180
      return executor;
1✔
2181
    }
2182

2183
    synchronized void release() {
2184
      if (executor != null) {
1✔
2185
        executor = pool.returnObject(executor);
1✔
2186
      }
2187
    }
1✔
2188

2189
    @Override
2190
    public void execute(Runnable command) {
2191
      getExecutor().execute(command);
1✔
2192
    }
1✔
2193
  }
2194

2195
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2196
    final ScheduledExecutorService delegate;
2197

2198
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2199
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2200
    }
1✔
2201

2202
    @Override
2203
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2204
      return delegate.schedule(callable, delay, unit);
×
2205
    }
2206

2207
    @Override
2208
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2209
      return delegate.schedule(cmd, delay, unit);
1✔
2210
    }
2211

2212
    @Override
2213
    public ScheduledFuture<?> scheduleAtFixedRate(
2214
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2215
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
×
2216
    }
2217

2218
    @Override
2219
    public ScheduledFuture<?> scheduleWithFixedDelay(
2220
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2221
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2222
    }
2223

2224
    @Override
2225
    public boolean awaitTermination(long timeout, TimeUnit unit)
2226
        throws InterruptedException {
2227
      return delegate.awaitTermination(timeout, unit);
×
2228
    }
2229

2230
    @Override
2231
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2232
        throws InterruptedException {
2233
      return delegate.invokeAll(tasks);
×
2234
    }
2235

2236
    @Override
2237
    public <T> List<Future<T>> invokeAll(
2238
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2239
        throws InterruptedException {
2240
      return delegate.invokeAll(tasks, timeout, unit);
×
2241
    }
2242

2243
    @Override
2244
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2245
        throws InterruptedException, ExecutionException {
2246
      return delegate.invokeAny(tasks);
×
2247
    }
2248

2249
    @Override
2250
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2251
        throws InterruptedException, ExecutionException, TimeoutException {
2252
      return delegate.invokeAny(tasks, timeout, unit);
×
2253
    }
2254

2255
    @Override
2256
    public boolean isShutdown() {
2257
      return delegate.isShutdown();
×
2258
    }
2259

2260
    @Override
2261
    public boolean isTerminated() {
2262
      return delegate.isTerminated();
×
2263
    }
2264

2265
    @Override
2266
    public void shutdown() {
2267
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2268
    }
2269

2270
    @Override
2271
    public List<Runnable> shutdownNow() {
2272
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2273
    }
2274

2275
    @Override
2276
    public <T> Future<T> submit(Callable<T> task) {
2277
      return delegate.submit(task);
×
2278
    }
2279

2280
    @Override
2281
    public Future<?> submit(Runnable task) {
2282
      return delegate.submit(task);
×
2283
    }
2284

2285
    @Override
2286
    public <T> Future<T> submit(Runnable task, T result) {
2287
      return delegate.submit(task, result);
×
2288
    }
2289

2290
    @Override
2291
    public void execute(Runnable command) {
2292
      delegate.execute(command);
×
2293
    }
×
2294
  }
2295

2296
  /**
2297
   * A ResolutionState indicates the status of last name resolution.
2298
   */
2299
  enum ResolutionState {
1✔
2300
    NO_RESOLUTION,
1✔
2301
    SUCCESS,
1✔
2302
    ERROR
1✔
2303
  }
2304
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc