• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18698

pending completion
#18698

push

github-actions

web-flow
core: ManagedChannelImpl to always use RetryingNameResolver (#10328) (#10331)

ManagedCahnnelImpl did not make sure to use a RetryingNameResolver if
authority was not overriden. This was not a problem for DNS name
resolution as the DNS name resolver factory explicitly returns a
RetryingNameResolver. For polling name resolvers that do not do this in
their factories (like the grpclb name resolver) this meant not having retry
at all.

30833 of 34988 relevant lines covered (88.12%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.49
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.IDLE;
24
import static io.grpc.ConnectivityState.SHUTDOWN;
25
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
26
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.MoreObjects;
30
import com.google.common.base.Stopwatch;
31
import com.google.common.base.Supplier;
32
import com.google.common.util.concurrent.ListenableFuture;
33
import com.google.common.util.concurrent.SettableFuture;
34
import io.grpc.Attributes;
35
import io.grpc.CallCredentials;
36
import io.grpc.CallOptions;
37
import io.grpc.Channel;
38
import io.grpc.ChannelCredentials;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.ClientCall;
42
import io.grpc.ClientInterceptor;
43
import io.grpc.ClientInterceptors;
44
import io.grpc.ClientStreamTracer;
45
import io.grpc.CompressorRegistry;
46
import io.grpc.ConnectivityState;
47
import io.grpc.ConnectivityStateInfo;
48
import io.grpc.Context;
49
import io.grpc.DecompressorRegistry;
50
import io.grpc.EquivalentAddressGroup;
51
import io.grpc.ForwardingChannelBuilder;
52
import io.grpc.ForwardingClientCall;
53
import io.grpc.Grpc;
54
import io.grpc.InternalChannelz;
55
import io.grpc.InternalChannelz.ChannelStats;
56
import io.grpc.InternalChannelz.ChannelTrace;
57
import io.grpc.InternalConfigSelector;
58
import io.grpc.InternalInstrumented;
59
import io.grpc.InternalLogId;
60
import io.grpc.InternalWithLogId;
61
import io.grpc.LoadBalancer;
62
import io.grpc.LoadBalancer.CreateSubchannelArgs;
63
import io.grpc.LoadBalancer.PickResult;
64
import io.grpc.LoadBalancer.PickSubchannelArgs;
65
import io.grpc.LoadBalancer.ResolvedAddresses;
66
import io.grpc.LoadBalancer.SubchannelPicker;
67
import io.grpc.LoadBalancer.SubchannelStateListener;
68
import io.grpc.ManagedChannel;
69
import io.grpc.ManagedChannelBuilder;
70
import io.grpc.Metadata;
71
import io.grpc.MethodDescriptor;
72
import io.grpc.NameResolver;
73
import io.grpc.NameResolver.ConfigOrError;
74
import io.grpc.NameResolver.ResolutionResult;
75
import io.grpc.NameResolverRegistry;
76
import io.grpc.ProxyDetector;
77
import io.grpc.Status;
78
import io.grpc.SynchronizationContext;
79
import io.grpc.SynchronizationContext.ScheduledHandle;
80
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
81
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
82
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
83
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
84
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
85
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
86
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
87
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
88
import io.grpc.internal.RetriableStream.Throttle;
89
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
90
import java.net.URI;
91
import java.net.URISyntaxException;
92
import java.util.ArrayList;
93
import java.util.Collection;
94
import java.util.Collections;
95
import java.util.HashSet;
96
import java.util.LinkedHashSet;
97
import java.util.List;
98
import java.util.Map;
99
import java.util.Set;
100
import java.util.concurrent.Callable;
101
import java.util.concurrent.CountDownLatch;
102
import java.util.concurrent.ExecutionException;
103
import java.util.concurrent.Executor;
104
import java.util.concurrent.Future;
105
import java.util.concurrent.ScheduledExecutorService;
106
import java.util.concurrent.ScheduledFuture;
107
import java.util.concurrent.TimeUnit;
108
import java.util.concurrent.TimeoutException;
109
import java.util.concurrent.atomic.AtomicBoolean;
110
import java.util.concurrent.atomic.AtomicReference;
111
import java.util.logging.Level;
112
import java.util.logging.Logger;
113
import java.util.regex.Pattern;
114
import javax.annotation.Nullable;
115
import javax.annotation.concurrent.GuardedBy;
116
import javax.annotation.concurrent.ThreadSafe;
117

118
/** A communication channel for making outgoing RPCs. */
119
@ThreadSafe
120
final class ManagedChannelImpl extends ManagedChannel implements
121
    InternalInstrumented<ChannelStats> {
122
  @VisibleForTesting
123
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
124

125
  // Matching this pattern means the target string is a URI target or at least intended to be one.
126
  // A URI target must be an absolute hierarchical URI.
127
  // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
128
  @VisibleForTesting
129
  static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
1✔
130

131
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
132

133
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
134

135
  @VisibleForTesting
136
  static final Status SHUTDOWN_NOW_STATUS =
1✔
137
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
138

139
  @VisibleForTesting
140
  static final Status SHUTDOWN_STATUS =
1✔
141
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
142

143
  @VisibleForTesting
144
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
145
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
146

147
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
148
      ManagedChannelServiceConfig.empty();
1✔
149
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
150
      new InternalConfigSelector() {
1✔
151
        @Override
152
        public Result selectConfig(PickSubchannelArgs args) {
153
          throw new IllegalStateException("Resolution is pending");
×
154
        }
155
      };
156

157
  private final InternalLogId logId;
158
  private final String target;
159
  @Nullable
160
  private final String authorityOverride;
161
  private final NameResolverRegistry nameResolverRegistry;
162
  private final NameResolver.Factory nameResolverFactory;
163
  private final NameResolver.Args nameResolverArgs;
164
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
165
  private final ClientTransportFactory originalTransportFactory;
166
  @Nullable
167
  private final ChannelCredentials originalChannelCreds;
168
  private final ClientTransportFactory transportFactory;
169
  private final ClientTransportFactory oobTransportFactory;
170
  private final RestrictedScheduledExecutor scheduledExecutor;
171
  private final Executor executor;
172
  private final ObjectPool<? extends Executor> executorPool;
173
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
174
  private final ExecutorHolder balancerRpcExecutorHolder;
175
  private final ExecutorHolder offloadExecutorHolder;
176
  private final TimeProvider timeProvider;
177
  private final int maxTraceEvents;
178

179
  @VisibleForTesting
1✔
180
  final SynchronizationContext syncContext = new SynchronizationContext(
181
      new Thread.UncaughtExceptionHandler() {
1✔
182
        @Override
183
        public void uncaughtException(Thread t, Throwable e) {
184
          logger.log(
1✔
185
              Level.SEVERE,
186
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
187
              e);
188
          panic(e);
1✔
189
        }
1✔
190
      });
191

192
  private boolean fullStreamDecompression;
193

194
  private final DecompressorRegistry decompressorRegistry;
195
  private final CompressorRegistry compressorRegistry;
196

197
  private final Supplier<Stopwatch> stopwatchSupplier;
198
  /** The timout before entering idle mode. */
199
  private final long idleTimeoutMillis;
200

201
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
202
  private final BackoffPolicy.Provider backoffPolicyProvider;
203

204
  /**
205
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
206
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
207
   * {@link RealChannel}.
208
   */
209
  private final Channel interceptorChannel;
210
  @Nullable private final String userAgent;
211

212
  // Only null after channel is terminated. Must be assigned from the syncContext.
213
  private NameResolver nameResolver;
214

215
  // Must be accessed from the syncContext.
216
  private boolean nameResolverStarted;
217

218
  // null when channel is in idle mode.  Must be assigned from syncContext.
219
  @Nullable
220
  private LbHelperImpl lbHelper;
221

222
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
223
  // null if channel is in idle mode.
224
  @Nullable
225
  private volatile SubchannelPicker subchannelPicker;
226

227
  // Must be accessed from the syncContext
228
  private boolean panicMode;
229

230
  // Must be mutated from syncContext
231
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
232
  // switch to a ConcurrentHashMap.
233
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
234

235
  // Must be accessed from syncContext
236
  @Nullable
237
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
238
  private final Object pendingCallsInUseObject = new Object();
1✔
239

240
  // Must be mutated from syncContext
241
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
242

243
  // reprocess() must be run from syncContext
244
  private final DelayedClientTransport delayedTransport;
245
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
246
      = new UncommittedRetriableStreamsRegistry();
247

248
  // Shutdown states.
249
  //
250
  // Channel's shutdown process:
251
  // 1. shutdown(): stop accepting new calls from applications
252
  //   1a shutdown <- true
253
  //   1b subchannelPicker <- null
254
  //   1c delayedTransport.shutdown()
255
  // 2. delayedTransport terminated: stop stream-creation functionality
256
  //   2a terminating <- true
257
  //   2b loadBalancer.shutdown()
258
  //     * LoadBalancer will shutdown subchannels and OOB channels
259
  //   2c loadBalancer <- null
260
  //   2d nameResolver.shutdown()
261
  //   2e nameResolver <- null
262
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
263

264
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
265
  // Must only be mutated and read from syncContext
266
  private boolean shutdownNowed;
267
  // Must only be mutated from syncContext
268
  private boolean terminating;
269
  // Must be mutated from syncContext
270
  private volatile boolean terminated;
271
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
272

273
  private final CallTracer.Factory callTracerFactory;
274
  private final CallTracer channelCallTracer;
275
  private final ChannelTracer channelTracer;
276
  private final ChannelLogger channelLogger;
277
  private final InternalChannelz channelz;
278
  private final RealChannel realChannel;
279
  // Must be mutated and read from syncContext
280
  // a flag for doing channel tracing when flipped
281
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
282
  // Must be mutated and read from constructor or syncContext
283
  // used for channel tracing when value changed
284
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
285

286
  @Nullable
287
  private final ManagedChannelServiceConfig defaultServiceConfig;
288
  // Must be mutated and read from constructor or syncContext
289
  private boolean serviceConfigUpdated = false;
1✔
290
  private final boolean lookUpServiceConfig;
291

292
  // One instance per channel.
293
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
294

295
  private final long perRpcBufferLimit;
296
  private final long channelBufferLimit;
297

298
  // Temporary false flag that can skip the retry code path.
299
  private final boolean retryEnabled;
300

301
  // Called from syncContext
302
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
303
      new DelayedTransportListener();
304

305
  // Must be called from syncContext
306
  private void maybeShutdownNowSubchannels() {
307
    if (shutdownNowed) {
1✔
308
      for (InternalSubchannel subchannel : subchannels) {
1✔
309
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
310
      }
1✔
311
      for (OobChannel oobChannel : oobChannels) {
1✔
312
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
313
      }
1✔
314
    }
315
  }
1✔
316

317
  // Must be accessed from syncContext
318
  @VisibleForTesting
1✔
319
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
320

321
  @Override
322
  public ListenableFuture<ChannelStats> getStats() {
323
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
324
    final class StatsFetcher implements Runnable {
1✔
325
      @Override
326
      public void run() {
327
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
328
        channelCallTracer.updateBuilder(builder);
1✔
329
        channelTracer.updateBuilder(builder);
1✔
330
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
331
        List<InternalWithLogId> children = new ArrayList<>();
1✔
332
        children.addAll(subchannels);
1✔
333
        children.addAll(oobChannels);
1✔
334
        builder.setSubchannels(children);
1✔
335
        ret.set(builder.build());
1✔
336
      }
1✔
337
    }
338

339
    // subchannels and oobchannels can only be accessed from syncContext
340
    syncContext.execute(new StatsFetcher());
1✔
341
    return ret;
1✔
342
  }
343

344
  @Override
345
  public InternalLogId getLogId() {
346
    return logId;
1✔
347
  }
348

349
  // Run from syncContext
350
  private class IdleModeTimer implements Runnable {
1✔
351

352
    @Override
353
    public void run() {
354
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
355
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
356
      // subtle to change rapidly to resolve the channel panic. See #8714
357
      if (lbHelper == null) {
1✔
358
        return;
×
359
      }
360
      enterIdleMode();
1✔
361
    }
1✔
362
  }
363

364
  // Must be called from syncContext
365
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
366
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
367
    if (channelIsActive) {
1✔
368
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
369
      checkState(lbHelper != null, "lbHelper is null");
1✔
370
    }
371
    if (nameResolver != null) {
1✔
372
      nameResolver.shutdown();
1✔
373
      nameResolverStarted = false;
1✔
374
      if (channelIsActive) {
1✔
375
        nameResolver = getNameResolver(
1✔
376
            target, authorityOverride, nameResolverFactory, nameResolverArgs);
377
      } else {
378
        nameResolver = null;
1✔
379
      }
380
    }
381
    if (lbHelper != null) {
1✔
382
      lbHelper.lb.shutdown();
1✔
383
      lbHelper = null;
1✔
384
    }
385
    subchannelPicker = null;
1✔
386
  }
1✔
387

388
  /**
389
   * Make the channel exit idle mode, if it's in it.
390
   *
391
   * <p>Must be called from syncContext
392
   */
393
  @VisibleForTesting
394
  void exitIdleMode() {
395
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
396
    if (shutdown.get() || panicMode) {
1✔
397
      return;
1✔
398
    }
399
    if (inUseStateAggregator.isInUse()) {
1✔
400
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
401
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
402
      cancelIdleTimer(false);
1✔
403
    } else {
404
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
405
      // isInUse() == false, in which case we still need to schedule the timer.
406
      rescheduleIdleTimer();
1✔
407
    }
408
    if (lbHelper != null) {
1✔
409
      return;
1✔
410
    }
411
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
412
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
413
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
414
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
415
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
416
    this.lbHelper = lbHelper;
1✔
417

418
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
419
    nameResolver.start(listener);
1✔
420
    nameResolverStarted = true;
1✔
421
  }
1✔
422

423
  // Must be run from syncContext
424
  private void enterIdleMode() {
425
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
426
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
427
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
428
    // which are bugs.
429
    shutdownNameResolverAndLoadBalancer(true);
1✔
430
    delayedTransport.reprocess(null);
1✔
431
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
432
    channelStateManager.gotoState(IDLE);
1✔
433
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
434
    // transport to be holding some we need to exit idle mode to give these calls a chance to
435
    // be processed.
436
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
437
      exitIdleMode();
1✔
438
    }
439
  }
1✔
440

441
  // Must be run from syncContext
442
  private void cancelIdleTimer(boolean permanent) {
443
    idleTimer.cancel(permanent);
1✔
444
  }
1✔
445

446
  // Always run from syncContext
447
  private void rescheduleIdleTimer() {
448
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
449
      return;
1✔
450
    }
451
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
452
  }
1✔
453

454
  /**
455
   * Force name resolution refresh to happen immediately. Must be run
456
   * from syncContext.
457
   */
458
  private void refreshNameResolution() {
459
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
460
    if (nameResolverStarted) {
1✔
461
      nameResolver.refresh();
1✔
462
    }
463
  }
1✔
464

465
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
466
    volatile Throttle throttle;
467

468
    private ClientTransport getTransport(PickSubchannelArgs args) {
469
      SubchannelPicker pickerCopy = subchannelPicker;
1✔
470
      if (shutdown.get()) {
1✔
471
        // If channel is shut down, delayedTransport is also shut down which will fail the stream
472
        // properly.
473
        return delayedTransport;
1✔
474
      }
475
      if (pickerCopy == null) {
1✔
476
        final class ExitIdleModeForTransport implements Runnable {
1✔
477
          @Override
478
          public void run() {
479
            exitIdleMode();
1✔
480
          }
1✔
481
        }
482

483
        syncContext.execute(new ExitIdleModeForTransport());
1✔
484
        return delayedTransport;
1✔
485
      }
486
      // There is no need to reschedule the idle timer here.
487
      //
488
      // pickerCopy != null, which means idle timer has not expired when this method starts.
489
      // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
490
      // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
491
      // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
492
      //
493
      // In most cases the idle timer is scheduled to fire after the transport has created the
494
      // stream, which would have reported in-use state to the channel that would have cancelled
495
      // the idle timer.
496
      PickResult pickResult = pickerCopy.pickSubchannel(args);
1✔
497
      ClientTransport transport = GrpcUtil.getTransportFromPickResult(
1✔
498
          pickResult, args.getCallOptions().isWaitForReady());
1✔
499
      if (transport != null) {
1✔
500
        return transport;
1✔
501
      }
502
      return delayedTransport;
1✔
503
    }
504

505
    @Override
506
    public ClientStream newStream(
507
        final MethodDescriptor<?, ?> method,
508
        final CallOptions callOptions,
509
        final Metadata headers,
510
        final Context context) {
511
      if (!retryEnabled) {
1✔
512
        ClientTransport transport =
1✔
513
            getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
1✔
514
        Context origContext = context.attach();
1✔
515
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
516
            callOptions, headers, 0, /* isTransparentRetry= */ false);
517
        try {
518
          return transport.newStream(method, headers, callOptions, tracers);
1✔
519
        } finally {
520
          context.detach(origContext);
1✔
521
        }
522
      } else {
523
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
524
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
525
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
526
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
527
          @SuppressWarnings("unchecked")
528
          RetryStream() {
1✔
529
            super(
1✔
530
                (MethodDescriptor<ReqT, ?>) method,
531
                headers,
532
                channelBufferUsed,
1✔
533
                perRpcBufferLimit,
1✔
534
                channelBufferLimit,
1✔
535
                getCallExecutor(callOptions),
1✔
536
                transportFactory.getScheduledExecutorService(),
1✔
537
                retryPolicy,
538
                hedgingPolicy,
539
                throttle);
540
          }
1✔
541

542
          @Override
543
          Status prestart() {
544
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
545
          }
546

547
          @Override
548
          void postCommit() {
549
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
550
          }
1✔
551

552
          @Override
553
          ClientStream newSubstream(
554
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
555
              boolean isTransparentRetry) {
556
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
557
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
558
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
559
            ClientTransport transport =
1✔
560
                getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
1✔
561
            Context origContext = context.attach();
1✔
562
            try {
563
              return transport.newStream(method, newHeaders, newOptions, tracers);
1✔
564
            } finally {
565
              context.detach(origContext);
1✔
566
            }
567
          }
568
        }
569

570
        return new RetryStream<>();
1✔
571
      }
572
    }
573
  }
574

575
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
576

577
  private final Rescheduler idleTimer;
578

579
  ManagedChannelImpl(
580
      ManagedChannelImplBuilder builder,
581
      ClientTransportFactory clientTransportFactory,
582
      BackoffPolicy.Provider backoffPolicyProvider,
583
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
584
      Supplier<Stopwatch> stopwatchSupplier,
585
      List<ClientInterceptor> interceptors,
586
      final TimeProvider timeProvider) {
1✔
587
    this.target = checkNotNull(builder.target, "target");
1✔
588
    this.logId = InternalLogId.allocate("Channel", target);
1✔
589
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
590
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
591
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
592
    this.originalChannelCreds = builder.channelCredentials;
1✔
593
    this.originalTransportFactory = clientTransportFactory;
1✔
594
    this.offloadExecutorHolder =
1✔
595
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
596
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
597
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
598
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
599
        clientTransportFactory, null, this.offloadExecutorHolder);
600
    this.scheduledExecutor =
1✔
601
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
602
    maxTraceEvents = builder.maxTraceEvents;
1✔
603
    channelTracer = new ChannelTracer(
1✔
604
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
605
        "Channel for '" + target + "'");
606
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
607
    ProxyDetector proxyDetector =
608
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
609
    this.retryEnabled = builder.retryEnabled;
1✔
610
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
611
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
612
    ScParser serviceConfigParser =
1✔
613
        new ScParser(
614
            retryEnabled,
615
            builder.maxRetryAttempts,
616
            builder.maxHedgedAttempts,
617
            loadBalancerFactory);
618
    this.authorityOverride = builder.authorityOverride;
1✔
619
    this.nameResolverArgs =
1✔
620
        NameResolver.Args.newBuilder()
1✔
621
            .setDefaultPort(builder.getDefaultPort())
1✔
622
            .setProxyDetector(proxyDetector)
1✔
623
            .setSynchronizationContext(syncContext)
1✔
624
            .setScheduledExecutorService(scheduledExecutor)
1✔
625
            .setServiceConfigParser(serviceConfigParser)
1✔
626
            .setChannelLogger(channelLogger)
1✔
627
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
628
            .setOverrideAuthority(this.authorityOverride)
1✔
629
            .build();
1✔
630
    this.nameResolverFactory = builder.nameResolverFactory;
1✔
631
    this.nameResolver = getNameResolver(
1✔
632
        target, authorityOverride, nameResolverFactory, nameResolverArgs);
633
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
634
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
635
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
636
    this.delayedTransport.start(delayedTransportListener);
1✔
637
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
638

639
    if (builder.defaultServiceConfig != null) {
1✔
640
      ConfigOrError parsedDefaultServiceConfig =
1✔
641
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
642
      checkState(
1✔
643
          parsedDefaultServiceConfig.getError() == null,
1✔
644
          "Default config is invalid: %s",
645
          parsedDefaultServiceConfig.getError());
1✔
646
      this.defaultServiceConfig =
1✔
647
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
648
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
649
    } else {
1✔
650
      this.defaultServiceConfig = null;
1✔
651
    }
652
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
653
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
654
    Channel channel = realChannel;
1✔
655
    if (builder.binlog != null) {
1✔
656
      channel = builder.binlog.wrapChannel(channel);
1✔
657
    }
658
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
659
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
660
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
661
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
662
    } else {
663
      checkArgument(
1✔
664
          builder.idleTimeoutMillis
665
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
666
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
667
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
668
    }
669

670
    idleTimer = new Rescheduler(
1✔
671
        new IdleModeTimer(),
672
        syncContext,
673
        transportFactory.getScheduledExecutorService(),
1✔
674
        stopwatchSupplier.get());
1✔
675
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
676
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
677
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
678
    this.userAgent = builder.userAgent;
1✔
679

680
    this.channelBufferLimit = builder.retryBufferSize;
1✔
681
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
682
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
683
      @Override
684
      public CallTracer create() {
685
        return new CallTracer(timeProvider);
1✔
686
      }
687
    }
688

689
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
690
    channelCallTracer = callTracerFactory.create();
1✔
691
    this.channelz = checkNotNull(builder.channelz);
1✔
692
    channelz.addRootChannel(this);
1✔
693

694
    if (!lookUpServiceConfig) {
1✔
695
      if (defaultServiceConfig != null) {
1✔
696
        channelLogger.log(
1✔
697
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
698
      }
699
      serviceConfigUpdated = true;
1✔
700
    }
701
  }
1✔
702

703
  private static NameResolver getNameResolver(
704
      String target, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) {
705
    // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
706
    // "dns:///".
707
    URI targetUri = null;
1✔
708
    StringBuilder uriSyntaxErrors = new StringBuilder();
1✔
709
    try {
710
      targetUri = new URI(target);
1✔
711
      // For "localhost:8080" this would likely cause newNameResolver to return null, because
712
      // "localhost" is parsed as the scheme. Will fall into the next branch and try
713
      // "dns:///localhost:8080".
714
    } catch (URISyntaxException e) {
1✔
715
      // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
716
      uriSyntaxErrors.append(e.getMessage());
1✔
717
    }
1✔
718
    if (targetUri != null) {
1✔
719
      NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
1✔
720
      if (resolver != null) {
1✔
721
        return resolver;
1✔
722
      }
723
      // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an
724
      // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080"
725
    }
726

727
    // If we reached here, the targetUri couldn't be used.
728
    if (!URI_PATTERN.matcher(target).matches()) {
1✔
729
      // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
730
      // scheme from the factory.
731
      try {
732
        targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
1✔
733
      } catch (URISyntaxException e) {
×
734
        // Should not be possible.
735
        throw new IllegalArgumentException(e);
×
736
      }
1✔
737
      NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
1✔
738
      if (resolver != null) {
1✔
739
        return resolver;
1✔
740
      }
741
    }
742
    throw new IllegalArgumentException(String.format(
1✔
743
        "cannot find a NameResolver for %s%s",
744
        target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
1✔
745
  }
746

747
  @VisibleForTesting
748
  static NameResolver getNameResolver(
749
      String target, @Nullable final String overrideAuthority,
750
      NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) {
751
    NameResolver resolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
1✔
752

753
    // If the nameResolver is not already a RetryingNameResolver, then wrap it with it.
754
    // This helps guarantee that name resolution retry remains supported even as it has been
755
    // removed from ManagedChannelImpl.
756
    // TODO: After a transition period, all NameResolver implementations that need retry should use
757
    //       RetryingNameResolver directly and this step can be removed.
758
    NameResolver usedNameResolver;
759
    if (resolver instanceof RetryingNameResolver) {
1✔
760
      usedNameResolver = resolver;
1✔
761
    } else {
762
      usedNameResolver = new RetryingNameResolver(resolver,
1✔
763
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
764
              nameResolverArgs.getScheduledExecutorService(),
1✔
765
              nameResolverArgs.getSynchronizationContext()),
1✔
766
          nameResolverArgs.getSynchronizationContext());
1✔
767
    }
768

769
    if (overrideAuthority == null) {
1✔
770
      return usedNameResolver;
1✔
771
    }
772

773
    return new ForwardingNameResolver(usedNameResolver) {
1✔
774
      @Override
775
      public String getServiceAuthority() {
776
        return overrideAuthority;
1✔
777
      }
778
    };
779
  }
780

781
  @VisibleForTesting
782
  InternalConfigSelector getConfigSelector() {
783
    return realChannel.configSelector.get();
1✔
784
  }
785

786
  /**
787
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
788
   * cancelled.
789
   */
790
  @Override
791
  public ManagedChannelImpl shutdown() {
792
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
793
    if (!shutdown.compareAndSet(false, true)) {
1✔
794
      return this;
1✔
795
    }
796
    final class Shutdown implements Runnable {
1✔
797
      @Override
798
      public void run() {
799
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
800
        channelStateManager.gotoState(SHUTDOWN);
1✔
801
      }
1✔
802
    }
803

804
    syncContext.execute(new Shutdown());
1✔
805
    realChannel.shutdown();
1✔
806
    final class CancelIdleTimer implements Runnable {
1✔
807
      @Override
808
      public void run() {
809
        cancelIdleTimer(/* permanent= */ true);
1✔
810
      }
1✔
811
    }
812

813
    syncContext.execute(new CancelIdleTimer());
1✔
814
    return this;
1✔
815
  }
816

817
  /**
818
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
819
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
820
   * return {@code false} immediately after this method returns.
821
   */
822
  @Override
823
  public ManagedChannelImpl shutdownNow() {
824
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
825
    shutdown();
1✔
826
    realChannel.shutdownNow();
1✔
827
    final class ShutdownNow implements Runnable {
1✔
828
      @Override
829
      public void run() {
830
        if (shutdownNowed) {
1✔
831
          return;
1✔
832
        }
833
        shutdownNowed = true;
1✔
834
        maybeShutdownNowSubchannels();
1✔
835
      }
1✔
836
    }
837

838
    syncContext.execute(new ShutdownNow());
1✔
839
    return this;
1✔
840
  }
841

842
  // Called from syncContext
843
  @VisibleForTesting
844
  void panic(final Throwable t) {
845
    if (panicMode) {
1✔
846
      // Preserve the first panic information
847
      return;
×
848
    }
849
    panicMode = true;
1✔
850
    cancelIdleTimer(/* permanent= */ true);
1✔
851
    shutdownNameResolverAndLoadBalancer(false);
1✔
852
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
853
      private final PickResult panicPickResult =
1✔
854
          PickResult.withDrop(
1✔
855
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
856

857
      @Override
858
      public PickResult pickSubchannel(PickSubchannelArgs args) {
859
        return panicPickResult;
1✔
860
      }
861

862
      @Override
863
      public String toString() {
864
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
865
            .add("panicPickResult", panicPickResult)
×
866
            .toString();
×
867
      }
868
    }
869

870
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
871
    realChannel.updateConfigSelector(null);
1✔
872
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
873
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
874
  }
1✔
875

876
  @VisibleForTesting
877
  boolean isInPanicMode() {
878
    return panicMode;
1✔
879
  }
880

881
  // Called from syncContext
882
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
883
    subchannelPicker = newPicker;
1✔
884
    delayedTransport.reprocess(newPicker);
1✔
885
  }
1✔
886

887
  @Override
888
  public boolean isShutdown() {
889
    return shutdown.get();
1✔
890
  }
891

892
  @Override
893
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
894
    return terminatedLatch.await(timeout, unit);
1✔
895
  }
896

897
  @Override
898
  public boolean isTerminated() {
899
    return terminated;
1✔
900
  }
901

902
  /*
903
   * Creates a new outgoing call on the channel.
904
   */
905
  @Override
906
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
907
      CallOptions callOptions) {
908
    return interceptorChannel.newCall(method, callOptions);
1✔
909
  }
910

911
  @Override
912
  public String authority() {
913
    return interceptorChannel.authority();
1✔
914
  }
915

916
  private Executor getCallExecutor(CallOptions callOptions) {
917
    Executor executor = callOptions.getExecutor();
1✔
918
    if (executor == null) {
1✔
919
      executor = this.executor;
1✔
920
    }
921
    return executor;
1✔
922
  }
923

924
  private class RealChannel extends Channel {
925
    // Reference to null if no config selector is available from resolution result
926
    // Reference must be set() from syncContext
927
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
928
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
929
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
930
    // same target, the new instance must have the same value.
931
    private final String authority;
932

933
    private final Channel clientCallImplChannel = new Channel() {
1✔
934
      @Override
935
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
936
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
937
        return new ClientCallImpl<>(
1✔
938
            method,
939
            getCallExecutor(callOptions),
1✔
940
            callOptions,
941
            transportProvider,
1✔
942
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
943
            channelCallTracer,
1✔
944
            null)
945
            .setFullStreamDecompression(fullStreamDecompression)
1✔
946
            .setDecompressorRegistry(decompressorRegistry)
1✔
947
            .setCompressorRegistry(compressorRegistry);
1✔
948
      }
949

950
      @Override
951
      public String authority() {
952
        return authority;
×
953
      }
954
    };
955

956
    private RealChannel(String authority) {
1✔
957
      this.authority =  checkNotNull(authority, "authority");
1✔
958
    }
1✔
959

960
    @Override
961
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
962
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
963
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
964
        return newClientCall(method, callOptions);
1✔
965
      }
966
      syncContext.execute(new Runnable() {
1✔
967
        @Override
968
        public void run() {
969
          exitIdleMode();
1✔
970
        }
1✔
971
      });
972
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
973
        // This is an optimization for the case (typically with InProcessTransport) when name
974
        // resolution result is immediately available at this point. Otherwise, some users'
975
        // tests might observe slight behavior difference from earlier grpc versions.
976
        return newClientCall(method, callOptions);
1✔
977
      }
978
      if (shutdown.get()) {
1✔
979
        // Return a failing ClientCall.
980
        return new ClientCall<ReqT, RespT>() {
×
981
          @Override
982
          public void start(Listener<RespT> responseListener, Metadata headers) {
983
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
984
          }
×
985

986
          @Override public void request(int numMessages) {}
×
987

988
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
989

990
          @Override public void halfClose() {}
×
991

992
          @Override public void sendMessage(ReqT message) {}
×
993
        };
994
      }
995
      Context context = Context.current();
1✔
996
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
997
      syncContext.execute(new Runnable() {
1✔
998
        @Override
999
        public void run() {
1000
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1001
            if (pendingCalls == null) {
1✔
1002
              pendingCalls = new LinkedHashSet<>();
1✔
1003
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
1004
            }
1005
            pendingCalls.add(pendingCall);
1✔
1006
          } else {
1007
            pendingCall.reprocess();
1✔
1008
          }
1009
        }
1✔
1010
      });
1011
      return pendingCall;
1✔
1012
    }
1013

1014
    // Must run in SynchronizationContext.
1015
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
1016
      InternalConfigSelector prevConfig = configSelector.get();
1✔
1017
      configSelector.set(config);
1✔
1018
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
1019
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1020
          pendingCall.reprocess();
1✔
1021
        }
1✔
1022
      }
1023
    }
1✔
1024

1025
    // Must run in SynchronizationContext.
1026
    void onConfigError() {
1027
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1028
        updateConfigSelector(null);
1✔
1029
      }
1030
    }
1✔
1031

1032
    void shutdown() {
1033
      final class RealChannelShutdown implements Runnable {
1✔
1034
        @Override
1035
        public void run() {
1036
          if (pendingCalls == null) {
1✔
1037
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1038
              configSelector.set(null);
1✔
1039
            }
1040
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1041
          }
1042
        }
1✔
1043
      }
1044

1045
      syncContext.execute(new RealChannelShutdown());
1✔
1046
    }
1✔
1047

1048
    void shutdownNow() {
1049
      final class RealChannelShutdownNow implements Runnable {
1✔
1050
        @Override
1051
        public void run() {
1052
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1053
            configSelector.set(null);
1✔
1054
          }
1055
          if (pendingCalls != null) {
1✔
1056
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1057
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
1058
            }
1✔
1059
          }
1060
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
1061
        }
1✔
1062
      }
1063

1064
      syncContext.execute(new RealChannelShutdownNow());
1✔
1065
    }
1✔
1066

1067
    @Override
1068
    public String authority() {
1069
      return authority;
1✔
1070
    }
1071

1072
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1073
      final Context context;
1074
      final MethodDescriptor<ReqT, RespT> method;
1075
      final CallOptions callOptions;
1076

1077
      PendingCall(
1078
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1079
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1080
        this.context = context;
1✔
1081
        this.method = method;
1✔
1082
        this.callOptions = callOptions;
1✔
1083
      }
1✔
1084

1085
      /** Called when it's ready to create a real call and reprocess the pending call. */
1086
      void reprocess() {
1087
        ClientCall<ReqT, RespT> realCall;
1088
        Context previous = context.attach();
1✔
1089
        try {
1090
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED, true);
1✔
1091
          realCall = newClientCall(method, delayResolutionOption);
1✔
1092
        } finally {
1093
          context.detach(previous);
1✔
1094
        }
1095
        Runnable toRun = setCall(realCall);
1✔
1096
        if (toRun == null) {
1✔
1097
          syncContext.execute(new PendingCallRemoval());
1✔
1098
        } else {
1099
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1100
            @Override
1101
            public void run() {
1102
              toRun.run();
1✔
1103
              syncContext.execute(new PendingCallRemoval());
1✔
1104
            }
1✔
1105
          });
1106
        }
1107
      }
1✔
1108

1109
      @Override
1110
      protected void callCancelled() {
1111
        super.callCancelled();
1✔
1112
        syncContext.execute(new PendingCallRemoval());
1✔
1113
      }
1✔
1114

1115
      final class PendingCallRemoval implements Runnable {
1✔
1116
        @Override
1117
        public void run() {
1118
          if (pendingCalls != null) {
1✔
1119
            pendingCalls.remove(PendingCall.this);
1✔
1120
            if (pendingCalls.isEmpty()) {
1✔
1121
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1122
              pendingCalls = null;
1✔
1123
              if (shutdown.get()) {
1✔
1124
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1125
              }
1126
            }
1127
          }
1128
        }
1✔
1129
      }
1130
    }
1131

1132
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1133
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1134
      InternalConfigSelector selector = configSelector.get();
1✔
1135
      if (selector == null) {
1✔
1136
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1137
      }
1138
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1139
        MethodInfo methodInfo =
1✔
1140
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1141
        if (methodInfo != null) {
1✔
1142
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1143
        }
1144
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1145
      }
1146
      return new ConfigSelectingClientCall<>(
1✔
1147
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1148
    }
1149
  }
1150

1151
  /**
1152
   * A client call for a given channel that applies a given config selector when it starts.
1153
   */
1154
  static final class ConfigSelectingClientCall<ReqT, RespT>
1155
      extends ForwardingClientCall<ReqT, RespT> {
1156

1157
    private final InternalConfigSelector configSelector;
1158
    private final Channel channel;
1159
    private final Executor callExecutor;
1160
    private final MethodDescriptor<ReqT, RespT> method;
1161
    private final Context context;
1162
    private CallOptions callOptions;
1163

1164
    private ClientCall<ReqT, RespT> delegate;
1165

1166
    ConfigSelectingClientCall(
1167
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1168
        MethodDescriptor<ReqT, RespT> method,
1169
        CallOptions callOptions) {
1✔
1170
      this.configSelector = configSelector;
1✔
1171
      this.channel = channel;
1✔
1172
      this.method = method;
1✔
1173
      this.callExecutor =
1✔
1174
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1175
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1176
      this.context = Context.current();
1✔
1177
    }
1✔
1178

1179
    @Override
1180
    protected ClientCall<ReqT, RespT> delegate() {
1181
      return delegate;
1✔
1182
    }
1183

1184
    @SuppressWarnings("unchecked")
1185
    @Override
1186
    public void start(Listener<RespT> observer, Metadata headers) {
1187
      PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
1✔
1188
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1189
      Status status = result.getStatus();
1✔
1190
      if (!status.isOk()) {
1✔
1191
        executeCloseObserverInContext(observer,
1✔
1192
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1193
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1194
        return;
1✔
1195
      }
1196
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1197
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1198
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1199
      if (methodInfo != null) {
1✔
1200
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1201
      }
1202
      if (interceptor != null) {
1✔
1203
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1204
      } else {
1205
        delegate = channel.newCall(method, callOptions);
×
1206
      }
1207
      delegate.start(observer, headers);
1✔
1208
    }
1✔
1209

1210
    private void executeCloseObserverInContext(
1211
        final Listener<RespT> observer, final Status status) {
1212
      class CloseInContext extends ContextRunnable {
1213
        CloseInContext() {
1✔
1214
          super(context);
1✔
1215
        }
1✔
1216

1217
        @Override
1218
        public void runInContext() {
1219
          observer.onClose(status, new Metadata());
1✔
1220
        }
1✔
1221
      }
1222

1223
      callExecutor.execute(new CloseInContext());
1✔
1224
    }
1✔
1225

1226
    @Override
1227
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1228
      if (delegate != null) {
×
1229
        delegate.cancel(message, cause);
×
1230
      }
1231
    }
×
1232
  }
1233

1234
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1235
    @Override
1236
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1237

1238
    @Override
1239
    public void request(int numMessages) {}
1✔
1240

1241
    @Override
1242
    public void cancel(String message, Throwable cause) {}
×
1243

1244
    @Override
1245
    public void halfClose() {}
×
1246

1247
    @Override
1248
    public void sendMessage(Object message) {}
×
1249

1250
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1251
    @Override
1252
    public boolean isReady() {
1253
      return false;
×
1254
    }
1255
  };
1256

1257
  /**
1258
   * Terminate the channel if termination conditions are met.
1259
   */
1260
  // Must be run from syncContext
1261
  private void maybeTerminateChannel() {
1262
    if (terminated) {
1✔
1263
      return;
×
1264
    }
1265
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1266
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1267
      channelz.removeRootChannel(this);
1✔
1268
      executorPool.returnObject(executor);
1✔
1269
      balancerRpcExecutorHolder.release();
1✔
1270
      offloadExecutorHolder.release();
1✔
1271
      // Release the transport factory so that it can deallocate any resources.
1272
      transportFactory.close();
1✔
1273

1274
      terminated = true;
1✔
1275
      terminatedLatch.countDown();
1✔
1276
    }
1277
  }
1✔
1278

1279
  // Must be called from syncContext
1280
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1281
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1282
      refreshNameResolution();
1✔
1283
    }
1284
  }
1✔
1285

1286
  @Override
1287
  @SuppressWarnings("deprecation")
1288
  public ConnectivityState getState(boolean requestConnection) {
1289
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1290
    if (requestConnection && savedChannelState == IDLE) {
1✔
1291
      final class RequestConnection implements Runnable {
1✔
1292
        @Override
1293
        public void run() {
1294
          exitIdleMode();
1✔
1295
          if (subchannelPicker != null) {
1✔
1296
            subchannelPicker.requestConnection();
1✔
1297
          }
1298
          if (lbHelper != null) {
1✔
1299
            lbHelper.lb.requestConnection();
1✔
1300
          }
1301
        }
1✔
1302
      }
1303

1304
      syncContext.execute(new RequestConnection());
1✔
1305
    }
1306
    return savedChannelState;
1✔
1307
  }
1308

1309
  @Override
1310
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1311
    final class NotifyStateChanged implements Runnable {
1✔
1312
      @Override
1313
      public void run() {
1314
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1315
      }
1✔
1316
    }
1317

1318
    syncContext.execute(new NotifyStateChanged());
1✔
1319
  }
1✔
1320

1321
  @Override
1322
  public void resetConnectBackoff() {
1323
    final class ResetConnectBackoff implements Runnable {
1✔
1324
      @Override
1325
      public void run() {
1326
        if (shutdown.get()) {
1✔
1327
          return;
1✔
1328
        }
1329
        if (nameResolverStarted) {
1✔
1330
          refreshNameResolution();
1✔
1331
        }
1332
        for (InternalSubchannel subchannel : subchannels) {
1✔
1333
          subchannel.resetConnectBackoff();
1✔
1334
        }
1✔
1335
        for (OobChannel oobChannel : oobChannels) {
1✔
1336
          oobChannel.resetConnectBackoff();
×
1337
        }
×
1338
      }
1✔
1339
    }
1340

1341
    syncContext.execute(new ResetConnectBackoff());
1✔
1342
  }
1✔
1343

1344
  @Override
1345
  public void enterIdle() {
1346
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1347
      @Override
1348
      public void run() {
1349
        if (shutdown.get() || lbHelper == null) {
1✔
1350
          return;
1✔
1351
        }
1352
        cancelIdleTimer(/* permanent= */ false);
1✔
1353
        enterIdleMode();
1✔
1354
      }
1✔
1355
    }
1356

1357
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1358
  }
1✔
1359

1360
  /**
1361
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1362
   * backoff.
1363
   */
1364
  private final class UncommittedRetriableStreamsRegistry {
1✔
1365
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1366
    // it's worthwhile to look for a lock-free approach.
1367
    final Object lock = new Object();
1✔
1368

1369
    @GuardedBy("lock")
1✔
1370
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1371

1372
    @GuardedBy("lock")
1373
    Status shutdownStatus;
1374

1375
    void onShutdown(Status reason) {
1376
      boolean shouldShutdownDelayedTransport = false;
1✔
1377
      synchronized (lock) {
1✔
1378
        if (shutdownStatus != null) {
1✔
1379
          return;
1✔
1380
        }
1381
        shutdownStatus = reason;
1✔
1382
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1383
        // retriable streams, which may be in backoff and not using any transport, are already
1384
        // started RPCs.
1385
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1386
          shouldShutdownDelayedTransport = true;
1✔
1387
        }
1388
      }
1✔
1389

1390
      if (shouldShutdownDelayedTransport) {
1✔
1391
        delayedTransport.shutdown(reason);
1✔
1392
      }
1393
    }
1✔
1394

1395
    void onShutdownNow(Status reason) {
1396
      onShutdown(reason);
1✔
1397
      Collection<ClientStream> streams;
1398

1399
      synchronized (lock) {
1✔
1400
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1401
      }
1✔
1402

1403
      for (ClientStream stream : streams) {
1✔
1404
        stream.cancel(reason);
1✔
1405
      }
1✔
1406
      delayedTransport.shutdownNow(reason);
1✔
1407
    }
1✔
1408

1409
    /**
1410
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1411
     * shutdown Status.
1412
     */
1413
    @Nullable
1414
    Status add(RetriableStream<?> retriableStream) {
1415
      synchronized (lock) {
1✔
1416
        if (shutdownStatus != null) {
1✔
1417
          return shutdownStatus;
1✔
1418
        }
1419
        uncommittedRetriableStreams.add(retriableStream);
1✔
1420
        return null;
1✔
1421
      }
1422
    }
1423

1424
    void remove(RetriableStream<?> retriableStream) {
1425
      Status shutdownStatusCopy = null;
1✔
1426

1427
      synchronized (lock) {
1✔
1428
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1429
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1430
          shutdownStatusCopy = shutdownStatus;
1✔
1431
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1432
          // hashmap.
1433
          uncommittedRetriableStreams = new HashSet<>();
1✔
1434
        }
1435
      }
1✔
1436

1437
      if (shutdownStatusCopy != null) {
1✔
1438
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1439
      }
1440
    }
1✔
1441
  }
1442

1443
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1444
    AutoConfiguredLoadBalancer lb;
1445

1446
    @Override
1447
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1448
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1449
      // No new subchannel should be created after load balancer has been shutdown.
1450
      checkState(!terminating, "Channel is being terminated");
1✔
1451
      return new SubchannelImpl(args);
1✔
1452
    }
1453

1454
    @Override
1455
    public void updateBalancingState(
1456
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1457
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1458
      checkNotNull(newState, "newState");
1✔
1459
      checkNotNull(newPicker, "newPicker");
1✔
1460
      final class UpdateBalancingState implements Runnable {
1✔
1461
        @Override
1462
        public void run() {
1463
          if (LbHelperImpl.this != lbHelper) {
1✔
1464
            return;
1✔
1465
          }
1466
          updateSubchannelPicker(newPicker);
1✔
1467
          // It's not appropriate to report SHUTDOWN state from lb.
1468
          // Ignore the case of newState == SHUTDOWN for now.
1469
          if (newState != SHUTDOWN) {
1✔
1470
            channelLogger.log(
1✔
1471
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1472
            channelStateManager.gotoState(newState);
1✔
1473
          }
1474
        }
1✔
1475
      }
1476

1477
      syncContext.execute(new UpdateBalancingState());
1✔
1478
    }
1✔
1479

1480
    @Override
1481
    public void refreshNameResolution() {
1482
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1483
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1484
        @Override
1485
        public void run() {
1486
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1487
        }
1✔
1488
      }
1489

1490
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1491
    }
1✔
1492

1493
    @Override
1494
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1495
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1496
    }
1497

1498
    @Override
1499
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1500
        String authority) {
1501
      // TODO(ejona): can we be even stricter? Like terminating?
1502
      checkState(!terminated, "Channel is terminated");
1✔
1503
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1504
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1505
      InternalLogId subchannelLogId =
1✔
1506
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1507
      ChannelTracer oobChannelTracer =
1✔
1508
          new ChannelTracer(
1509
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1510
              "OobChannel for " + addressGroup);
1511
      final OobChannel oobChannel = new OobChannel(
1✔
1512
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1513
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1514
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1515
          .setDescription("Child OobChannel created")
1✔
1516
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1517
          .setTimestampNanos(oobChannelCreationTime)
1✔
1518
          .setChannelRef(oobChannel)
1✔
1519
          .build());
1✔
1520
      ChannelTracer subchannelTracer =
1✔
1521
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1522
              "Subchannel for " + addressGroup);
1523
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1524
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1525
        @Override
1526
        void onTerminated(InternalSubchannel is) {
1527
          oobChannels.remove(oobChannel);
1✔
1528
          channelz.removeSubchannel(is);
1✔
1529
          oobChannel.handleSubchannelTerminated();
1✔
1530
          maybeTerminateChannel();
1✔
1531
        }
1✔
1532

1533
        @Override
1534
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1535
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1536
          //  state and refresh name resolution if necessary.
1537
          handleInternalSubchannelState(newState);
1✔
1538
          oobChannel.handleSubchannelStateChange(newState);
1✔
1539
        }
1✔
1540
      }
1541

1542
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1543
          addressGroup,
1544
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1545
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1546
          // All callback methods are run from syncContext
1547
          new ManagedOobChannelCallback(),
1548
          channelz,
1✔
1549
          callTracerFactory.create(),
1✔
1550
          subchannelTracer,
1551
          subchannelLogId,
1552
          subchannelLogger);
1553
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1554
          .setDescription("Child Subchannel created")
1✔
1555
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1556
          .setTimestampNanos(oobChannelCreationTime)
1✔
1557
          .setSubchannelRef(internalSubchannel)
1✔
1558
          .build());
1✔
1559
      channelz.addSubchannel(oobChannel);
1✔
1560
      channelz.addSubchannel(internalSubchannel);
1✔
1561
      oobChannel.setSubchannel(internalSubchannel);
1✔
1562
      final class AddOobChannel implements Runnable {
1✔
1563
        @Override
1564
        public void run() {
1565
          if (terminating) {
1✔
1566
            oobChannel.shutdown();
×
1567
          }
1568
          if (!terminated) {
1✔
1569
            // If channel has not terminated, it will track the subchannel and block termination
1570
            // for it.
1571
            oobChannels.add(oobChannel);
1✔
1572
          }
1573
        }
1✔
1574
      }
1575

1576
      syncContext.execute(new AddOobChannel());
1✔
1577
      return oobChannel;
1✔
1578
    }
1579

1580
    @Deprecated
1581
    @Override
1582
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1583
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1584
          // Override authority to keep the old behavior.
1585
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1586
          .overrideAuthority(getAuthority());
1✔
1587
    }
1588

1589
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1590
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1591
    @Override
1592
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1593
        final String target, final ChannelCredentials channelCreds) {
1594
      checkNotNull(channelCreds, "channelCreds");
1✔
1595

1596
      final class ResolvingOobChannelBuilder
1597
          extends ForwardingChannelBuilder<ResolvingOobChannelBuilder> {
1598
        final ManagedChannelBuilder<?> delegate;
1599

1600
        ResolvingOobChannelBuilder() {
1✔
1601
          final ClientTransportFactory transportFactory;
1602
          CallCredentials callCredentials;
1603
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1604
            transportFactory = originalTransportFactory;
1✔
1605
            callCredentials = null;
1✔
1606
          } else {
1607
            SwapChannelCredentialsResult swapResult =
1✔
1608
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1609
            if (swapResult == null) {
1✔
1610
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1611
              return;
×
1612
            } else {
1613
              transportFactory = swapResult.transportFactory;
1✔
1614
              callCredentials = swapResult.callCredentials;
1✔
1615
            }
1616
          }
1617
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1618
              new ClientTransportFactoryBuilder() {
1✔
1619
                @Override
1620
                public ClientTransportFactory buildClientTransportFactory() {
1621
                  return transportFactory;
1✔
1622
                }
1623
              };
1624
          delegate = new ManagedChannelImplBuilder(
1✔
1625
              target,
1626
              channelCreds,
1627
              callCredentials,
1628
              transportFactoryBuilder,
1629
              new FixedPortProvider(nameResolverArgs.getDefaultPort()));
1✔
1630
        }
1✔
1631

1632
        @Override
1633
        protected ManagedChannelBuilder<?> delegate() {
1634
          return delegate;
1✔
1635
        }
1636
      }
1637

1638
      checkState(!terminated, "Channel is terminated");
1✔
1639

1640
      @SuppressWarnings("deprecation")
1641
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder()
1✔
1642
          .nameResolverFactory(nameResolverFactory);
1✔
1643

1644
      return builder
1✔
1645
          // TODO(zdapeng): executors should not outlive the parent channel.
1646
          .executor(executor)
1✔
1647
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1648
          .maxTraceEvents(maxTraceEvents)
1✔
1649
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1650
          .userAgent(userAgent);
1✔
1651
    }
1652

1653
    @Override
1654
    public ChannelCredentials getUnsafeChannelCredentials() {
1655
      if (originalChannelCreds == null) {
×
1656
        return new DefaultChannelCreds();
×
1657
      }
1658
      return originalChannelCreds;
×
1659
    }
1660

1661
    @Override
1662
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1663
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1664
    }
×
1665

1666
    @Override
1667
    public void updateOobChannelAddresses(ManagedChannel channel,
1668
        List<EquivalentAddressGroup> eag) {
1669
      checkArgument(channel instanceof OobChannel,
1✔
1670
          "channel must have been returned from createOobChannel");
1671
      ((OobChannel) channel).updateAddresses(eag);
1✔
1672
    }
1✔
1673

1674
    @Override
1675
    public String getAuthority() {
1676
      return ManagedChannelImpl.this.authority();
1✔
1677
    }
1678

1679
    @Override
1680
    public SynchronizationContext getSynchronizationContext() {
1681
      return syncContext;
1✔
1682
    }
1683

1684
    @Override
1685
    public ScheduledExecutorService getScheduledExecutorService() {
1686
      return scheduledExecutor;
1✔
1687
    }
1688

1689
    @Override
1690
    public ChannelLogger getChannelLogger() {
1691
      return channelLogger;
1✔
1692
    }
1693

1694
    @Override
1695
    public NameResolver.Args getNameResolverArgs() {
1696
      return nameResolverArgs;
1✔
1697
    }
1698

1699
    @Override
1700
    public NameResolverRegistry getNameResolverRegistry() {
1701
      return nameResolverRegistry;
1✔
1702
    }
1703

1704
    /**
1705
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1706
     */
1707
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1708
    //     channel creds.
1709
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1710
      @Override
1711
      public ChannelCredentials withoutBearerTokens() {
1712
        return this;
×
1713
      }
1714
    }
1715
  }
1716

1717
  final class NameResolverListener extends NameResolver.Listener2 {
1718
    final LbHelperImpl helper;
1719
    final NameResolver resolver;
1720

1721
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1722
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1723
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1724
    }
1✔
1725

1726
    @Override
1727
    public void onResult(final ResolutionResult resolutionResult) {
1728
      final class NamesResolved implements Runnable {
1✔
1729

1730
        @SuppressWarnings("ReferenceEquality")
1731
        @Override
1732
        public void run() {
1733
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1734
            return;
1✔
1735
          }
1736

1737
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1738
          channelLogger.log(
1✔
1739
              ChannelLogLevel.DEBUG,
1740
              "Resolved address: {0}, config={1}",
1741
              servers,
1742
              resolutionResult.getAttributes());
1✔
1743

1744
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1745
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1746
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1747
          }
1748

1749
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1750
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1751
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1752
          InternalConfigSelector resolvedConfigSelector =
1✔
1753
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1754
          ManagedChannelServiceConfig validServiceConfig =
1755
              configOrError != null && configOrError.getConfig() != null
1✔
1756
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1757
                  : null;
1✔
1758
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1759

1760
          ManagedChannelServiceConfig effectiveServiceConfig;
1761
          if (!lookUpServiceConfig) {
1✔
1762
            if (validServiceConfig != null) {
1✔
1763
              channelLogger.log(
1✔
1764
                  ChannelLogLevel.INFO,
1765
                  "Service config from name resolver discarded by channel settings");
1766
            }
1767
            effectiveServiceConfig =
1768
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1769
            if (resolvedConfigSelector != null) {
1✔
1770
              channelLogger.log(
1✔
1771
                  ChannelLogLevel.INFO,
1772
                  "Config selector from name resolver discarded by channel settings");
1773
            }
1774
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1775
          } else {
1776
            // Try to use config if returned from name resolver
1777
            // Otherwise, try to use the default config if available
1778
            if (validServiceConfig != null) {
1✔
1779
              effectiveServiceConfig = validServiceConfig;
1✔
1780
              if (resolvedConfigSelector != null) {
1✔
1781
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1782
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1783
                  channelLogger.log(
×
1784
                      ChannelLogLevel.DEBUG,
1785
                      "Method configs in service config will be discarded due to presence of"
1786
                          + "config-selector");
1787
                }
1788
              } else {
1789
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1790
              }
1791
            } else if (defaultServiceConfig != null) {
1✔
1792
              effectiveServiceConfig = defaultServiceConfig;
1✔
1793
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1794
              channelLogger.log(
1✔
1795
                  ChannelLogLevel.INFO,
1796
                  "Received no service config, using default service config");
1797
            } else if (serviceConfigError != null) {
1✔
1798
              if (!serviceConfigUpdated) {
1✔
1799
                // First DNS lookup has invalid service config, and cannot fall back to default
1800
                channelLogger.log(
1✔
1801
                    ChannelLogLevel.INFO,
1802
                    "Fallback to error due to invalid first service config without default config");
1803
                // This error could be an "inappropriate" control plane error that should not bleed
1804
                // through to client code using gRPC. We let them flow through here to the LB as
1805
                // we later check for these error codes when investigating pick results in
1806
                // GrpcUtil.getTransportFromPickResult().
1807
                onError(configOrError.getError());
1✔
1808
                if (resolutionResultListener != null) {
1✔
1809
                  resolutionResultListener.resolutionAttempted(false);
1✔
1810
                }
1811
                return;
1✔
1812
              } else {
1813
                effectiveServiceConfig = lastServiceConfig;
1✔
1814
              }
1815
            } else {
1816
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1817
              realChannel.updateConfigSelector(null);
1✔
1818
            }
1819
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1820
              channelLogger.log(
1✔
1821
                  ChannelLogLevel.INFO,
1822
                  "Service config changed{0}",
1823
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1824
              lastServiceConfig = effectiveServiceConfig;
1✔
1825
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1826
            }
1827

1828
            try {
1829
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1830
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1831
              //  lbNeedAddress is not deterministic
1832
              serviceConfigUpdated = true;
1✔
1833
            } catch (RuntimeException re) {
×
1834
              logger.log(
×
1835
                  Level.WARNING,
1836
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1837
                  re);
1838
            }
1✔
1839
          }
1840

1841
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1842
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1843
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1844
            Attributes.Builder attrBuilder =
1✔
1845
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1846
            Map<String, ?> healthCheckingConfig =
1✔
1847
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1848
            if (healthCheckingConfig != null) {
1✔
1849
              attrBuilder
1✔
1850
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1851
                  .build();
1✔
1852
            }
1853
            Attributes attributes = attrBuilder.build();
1✔
1854

1855
            boolean lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses(
1✔
1856
                ResolvedAddresses.newBuilder()
1✔
1857
                    .setAddresses(servers)
1✔
1858
                    .setAttributes(attributes)
1✔
1859
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1860
                    .build());
1✔
1861
            // If a listener is provided, let it know if the addresses were accepted.
1862
            if (resolutionResultListener != null) {
1✔
1863
              resolutionResultListener.resolutionAttempted(lastAddressesAccepted);
1✔
1864
            }
1865
          }
1866
        }
1✔
1867
      }
1868

1869
      syncContext.execute(new NamesResolved());
1✔
1870
    }
1✔
1871

1872
    @Override
1873
    public void onError(final Status error) {
1874
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1875
      final class NameResolverErrorHandler implements Runnable {
1✔
1876
        @Override
1877
        public void run() {
1878
          handleErrorInSyncContext(error);
1✔
1879
        }
1✔
1880
      }
1881

1882
      syncContext.execute(new NameResolverErrorHandler());
1✔
1883
    }
1✔
1884

1885
    private void handleErrorInSyncContext(Status error) {
1886
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1887
          new Object[] {getLogId(), error});
1✔
1888
      realChannel.onConfigError();
1✔
1889
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1890
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1891
        lastResolutionState = ResolutionState.ERROR;
1✔
1892
      }
1893
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1894
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1895
        return;
1✔
1896
      }
1897

1898
      helper.lb.handleNameResolutionError(error);
1✔
1899
    }
1✔
1900
  }
1901

1902
  private final class SubchannelImpl extends AbstractSubchannel {
1903
    final CreateSubchannelArgs args;
1904
    final InternalLogId subchannelLogId;
1905
    final ChannelLoggerImpl subchannelLogger;
1906
    final ChannelTracer subchannelTracer;
1907
    List<EquivalentAddressGroup> addressGroups;
1908
    InternalSubchannel subchannel;
1909
    boolean started;
1910
    boolean shutdown;
1911
    ScheduledHandle delayedShutdownTask;
1912

1913
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1914
      checkNotNull(args, "args");
1✔
1915
      addressGroups = args.getAddresses();
1✔
1916
      if (authorityOverride != null) {
1✔
1917
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1918
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1919
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1920
      }
1921
      this.args = args;
1✔
1922
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1923
      subchannelTracer = new ChannelTracer(
1✔
1924
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1925
          "Subchannel for " + args.getAddresses());
1✔
1926
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1927
    }
1✔
1928

1929
    @Override
1930
    public void start(final SubchannelStateListener listener) {
1931
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1932
      checkState(!started, "already started");
1✔
1933
      checkState(!shutdown, "already shutdown");
1✔
1934
      checkState(!terminating, "Channel is being terminated");
1✔
1935
      started = true;
1✔
1936
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1937
        // All callbacks are run in syncContext
1938
        @Override
1939
        void onTerminated(InternalSubchannel is) {
1940
          subchannels.remove(is);
1✔
1941
          channelz.removeSubchannel(is);
1✔
1942
          maybeTerminateChannel();
1✔
1943
        }
1✔
1944

1945
        @Override
1946
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1947
          checkState(listener != null, "listener is null");
1✔
1948
          listener.onSubchannelState(newState);
1✔
1949
        }
1✔
1950

1951
        @Override
1952
        void onInUse(InternalSubchannel is) {
1953
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1954
        }
1✔
1955

1956
        @Override
1957
        void onNotInUse(InternalSubchannel is) {
1958
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1959
        }
1✔
1960
      }
1961

1962
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1963
          args.getAddresses(),
1✔
1964
          authority(),
1✔
1965
          userAgent,
1✔
1966
          backoffPolicyProvider,
1✔
1967
          transportFactory,
1✔
1968
          transportFactory.getScheduledExecutorService(),
1✔
1969
          stopwatchSupplier,
1✔
1970
          syncContext,
1971
          new ManagedInternalSubchannelCallback(),
1972
          channelz,
1✔
1973
          callTracerFactory.create(),
1✔
1974
          subchannelTracer,
1975
          subchannelLogId,
1976
          subchannelLogger);
1977

1978
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1979
          .setDescription("Child Subchannel started")
1✔
1980
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1981
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1982
          .setSubchannelRef(internalSubchannel)
1✔
1983
          .build());
1✔
1984

1985
      this.subchannel = internalSubchannel;
1✔
1986
      channelz.addSubchannel(internalSubchannel);
1✔
1987
      subchannels.add(internalSubchannel);
1✔
1988
    }
1✔
1989

1990
    @Override
1991
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1992
      checkState(started, "not started");
1✔
1993
      return subchannel;
1✔
1994
    }
1995

1996
    @Override
1997
    public void shutdown() {
1998
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1999
      if (subchannel == null) {
1✔
2000
        // start() was not successful
2001
        shutdown = true;
×
2002
        return;
×
2003
      }
2004
      if (shutdown) {
1✔
2005
        if (terminating && delayedShutdownTask != null) {
1✔
2006
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
2007
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
2008
          delayedShutdownTask.cancel();
×
2009
          delayedShutdownTask = null;
×
2010
          // Will fall through to the subchannel.shutdown() at the end.
2011
        } else {
2012
          return;
1✔
2013
        }
2014
      } else {
2015
        shutdown = true;
1✔
2016
      }
2017
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
2018
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
2019
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
2020
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
2021
      // shutdown of Subchannel for a few seconds here.
2022
      //
2023
      // TODO(zhangkun83): consider a better approach
2024
      // (https://github.com/grpc/grpc-java/issues/2562).
2025
      if (!terminating) {
1✔
2026
        final class ShutdownSubchannel implements Runnable {
1✔
2027
          @Override
2028
          public void run() {
2029
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
2030
          }
1✔
2031
        }
2032

2033
        delayedShutdownTask = syncContext.schedule(
1✔
2034
            new LogExceptionRunnable(new ShutdownSubchannel()),
2035
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2036
            transportFactory.getScheduledExecutorService());
1✔
2037
        return;
1✔
2038
      }
2039
      // When terminating == true, no more real streams will be created. It's safe and also
2040
      // desirable to shutdown timely.
2041
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
2042
    }
1✔
2043

2044
    @Override
2045
    public void requestConnection() {
2046
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2047
      checkState(started, "not started");
1✔
2048
      subchannel.obtainActiveTransport();
1✔
2049
    }
1✔
2050

2051
    @Override
2052
    public List<EquivalentAddressGroup> getAllAddresses() {
2053
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2054
      checkState(started, "not started");
1✔
2055
      return addressGroups;
1✔
2056
    }
2057

2058
    @Override
2059
    public Attributes getAttributes() {
2060
      return args.getAttributes();
1✔
2061
    }
2062

2063
    @Override
2064
    public String toString() {
2065
      return subchannelLogId.toString();
1✔
2066
    }
2067

2068
    @Override
2069
    public Channel asChannel() {
2070
      checkState(started, "not started");
1✔
2071
      return new SubchannelChannel(
1✔
2072
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2073
          transportFactory.getScheduledExecutorService(),
1✔
2074
          callTracerFactory.create(),
1✔
2075
          new AtomicReference<InternalConfigSelector>(null));
2076
    }
2077

2078
    @Override
2079
    public Object getInternalSubchannel() {
2080
      checkState(started, "Subchannel is not started");
1✔
2081
      return subchannel;
1✔
2082
    }
2083

2084
    @Override
2085
    public ChannelLogger getChannelLogger() {
2086
      return subchannelLogger;
1✔
2087
    }
2088

2089
    @Override
2090
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2091
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2092
      addressGroups = addrs;
1✔
2093
      if (authorityOverride != null) {
1✔
2094
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2095
      }
2096
      subchannel.updateAddresses(addrs);
1✔
2097
    }
1✔
2098

2099
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2100
        List<EquivalentAddressGroup> eags) {
2101
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2102
      for (EquivalentAddressGroup eag : eags) {
1✔
2103
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2104
            eag.getAddresses(),
1✔
2105
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2106
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2107
      }
1✔
2108
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2109
    }
2110
  }
2111

2112
  @Override
2113
  public String toString() {
2114
    return MoreObjects.toStringHelper(this)
1✔
2115
        .add("logId", logId.getId())
1✔
2116
        .add("target", target)
1✔
2117
        .toString();
1✔
2118
  }
2119

2120
  /**
2121
   * Called from syncContext.
2122
   */
2123
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2124
    @Override
2125
    public void transportShutdown(Status s) {
2126
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2127
    }
1✔
2128

2129
    @Override
2130
    public void transportReady() {
2131
      // Don't care
2132
    }
×
2133

2134
    @Override
2135
    public void transportInUse(final boolean inUse) {
2136
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2137
    }
1✔
2138

2139
    @Override
2140
    public void transportTerminated() {
2141
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2142
      terminating = true;
1✔
2143
      shutdownNameResolverAndLoadBalancer(false);
1✔
2144
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2145
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2146
      // here.
2147
      maybeShutdownNowSubchannels();
1✔
2148
      maybeTerminateChannel();
1✔
2149
    }
1✔
2150
  }
2151

2152
  /**
2153
   * Must be accessed from syncContext.
2154
   */
2155
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2156
    @Override
2157
    protected void handleInUse() {
2158
      exitIdleMode();
1✔
2159
    }
1✔
2160

2161
    @Override
2162
    protected void handleNotInUse() {
2163
      if (shutdown.get()) {
1✔
2164
        return;
1✔
2165
      }
2166
      rescheduleIdleTimer();
1✔
2167
    }
1✔
2168
  }
2169

2170
  /**
2171
   * Lazily request for Executor from an executor pool.
2172
   * Also act as an Executor directly to simply run a cmd
2173
   */
2174
  @VisibleForTesting
2175
  static final class ExecutorHolder implements Executor {
2176
    private final ObjectPool<? extends Executor> pool;
2177
    private Executor executor;
2178

2179
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2180
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2181
    }
1✔
2182

2183
    synchronized Executor getExecutor() {
2184
      if (executor == null) {
1✔
2185
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2186
      }
2187
      return executor;
1✔
2188
    }
2189

2190
    synchronized void release() {
2191
      if (executor != null) {
1✔
2192
        executor = pool.returnObject(executor);
1✔
2193
      }
2194
    }
1✔
2195

2196
    @Override
2197
    public void execute(Runnable command) {
2198
      getExecutor().execute(command);
1✔
2199
    }
1✔
2200
  }
2201

2202
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2203
    final ScheduledExecutorService delegate;
2204

2205
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2206
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2207
    }
1✔
2208

2209
    @Override
2210
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2211
      return delegate.schedule(callable, delay, unit);
×
2212
    }
2213

2214
    @Override
2215
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2216
      return delegate.schedule(cmd, delay, unit);
1✔
2217
    }
2218

2219
    @Override
2220
    public ScheduledFuture<?> scheduleAtFixedRate(
2221
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2222
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
×
2223
    }
2224

2225
    @Override
2226
    public ScheduledFuture<?> scheduleWithFixedDelay(
2227
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2228
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2229
    }
2230

2231
    @Override
2232
    public boolean awaitTermination(long timeout, TimeUnit unit)
2233
        throws InterruptedException {
2234
      return delegate.awaitTermination(timeout, unit);
×
2235
    }
2236

2237
    @Override
2238
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2239
        throws InterruptedException {
2240
      return delegate.invokeAll(tasks);
×
2241
    }
2242

2243
    @Override
2244
    public <T> List<Future<T>> invokeAll(
2245
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2246
        throws InterruptedException {
2247
      return delegate.invokeAll(tasks, timeout, unit);
×
2248
    }
2249

2250
    @Override
2251
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2252
        throws InterruptedException, ExecutionException {
2253
      return delegate.invokeAny(tasks);
×
2254
    }
2255

2256
    @Override
2257
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2258
        throws InterruptedException, ExecutionException, TimeoutException {
2259
      return delegate.invokeAny(tasks, timeout, unit);
×
2260
    }
2261

2262
    @Override
2263
    public boolean isShutdown() {
2264
      return delegate.isShutdown();
×
2265
    }
2266

2267
    @Override
2268
    public boolean isTerminated() {
2269
      return delegate.isTerminated();
×
2270
    }
2271

2272
    @Override
2273
    public void shutdown() {
2274
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2275
    }
2276

2277
    @Override
2278
    public List<Runnable> shutdownNow() {
2279
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2280
    }
2281

2282
    @Override
2283
    public <T> Future<T> submit(Callable<T> task) {
2284
      return delegate.submit(task);
×
2285
    }
2286

2287
    @Override
2288
    public Future<?> submit(Runnable task) {
2289
      return delegate.submit(task);
×
2290
    }
2291

2292
    @Override
2293
    public <T> Future<T> submit(Runnable task, T result) {
2294
      return delegate.submit(task, result);
×
2295
    }
2296

2297
    @Override
2298
    public void execute(Runnable command) {
2299
      delegate.execute(command);
×
2300
    }
×
2301
  }
2302

2303
  /**
2304
   * A ResolutionState indicates the status of last name resolution.
2305
   */
2306
  enum ResolutionState {
1✔
2307
    NO_RESOLUTION,
1✔
2308
    SUCCESS,
1✔
2309
    ERROR
1✔
2310
  }
2311
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc