• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18720

pending completion
#18720

push

github-actions

web-flow
xds: Encode the service authority in XdsNameResolver (#10207)

Encode the service authority before passing it into gRPC util in the xDS name resolver to handle xDS requests which might contain multiple slashes. Example: xds:///path/to/service:port.

As currently the underlying Java URI library does not break the encoded authority into host/port correctly simplify the check to just look for '@' as we are only interested in checking for user info to validate the authority for HTTP.

This change also leads to few changes in unit tests that relied on this check for invalid authorities which now will be considered valid.

Just like #9376, depending on Guava packages such as URLEscapers or PercentEscapers leads to internal failures(Ex: Unresolvable reference to com.google.common.escape.Escaper from io.grpc.internal.GrpcUtil). To avoid these issues create an in house version that is heavily inspired by grpc-go/grpc.

30655 of 34740 relevant lines covered (88.24%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.27
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.IDLE;
24
import static io.grpc.ConnectivityState.SHUTDOWN;
25
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
26
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.MoreObjects;
30
import com.google.common.base.Stopwatch;
31
import com.google.common.base.Supplier;
32
import com.google.common.util.concurrent.ListenableFuture;
33
import com.google.common.util.concurrent.SettableFuture;
34
import io.grpc.Attributes;
35
import io.grpc.CallCredentials;
36
import io.grpc.CallOptions;
37
import io.grpc.Channel;
38
import io.grpc.ChannelCredentials;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.ClientCall;
42
import io.grpc.ClientInterceptor;
43
import io.grpc.ClientInterceptors;
44
import io.grpc.ClientStreamTracer;
45
import io.grpc.CompressorRegistry;
46
import io.grpc.ConnectivityState;
47
import io.grpc.ConnectivityStateInfo;
48
import io.grpc.Context;
49
import io.grpc.DecompressorRegistry;
50
import io.grpc.EquivalentAddressGroup;
51
import io.grpc.ForwardingChannelBuilder;
52
import io.grpc.ForwardingClientCall;
53
import io.grpc.Grpc;
54
import io.grpc.InternalChannelz;
55
import io.grpc.InternalChannelz.ChannelStats;
56
import io.grpc.InternalChannelz.ChannelTrace;
57
import io.grpc.InternalConfigSelector;
58
import io.grpc.InternalInstrumented;
59
import io.grpc.InternalLogId;
60
import io.grpc.InternalWithLogId;
61
import io.grpc.LoadBalancer;
62
import io.grpc.LoadBalancer.CreateSubchannelArgs;
63
import io.grpc.LoadBalancer.PickResult;
64
import io.grpc.LoadBalancer.PickSubchannelArgs;
65
import io.grpc.LoadBalancer.ResolvedAddresses;
66
import io.grpc.LoadBalancer.SubchannelPicker;
67
import io.grpc.LoadBalancer.SubchannelStateListener;
68
import io.grpc.ManagedChannel;
69
import io.grpc.ManagedChannelBuilder;
70
import io.grpc.Metadata;
71
import io.grpc.MethodDescriptor;
72
import io.grpc.NameResolver;
73
import io.grpc.NameResolver.ConfigOrError;
74
import io.grpc.NameResolver.ResolutionResult;
75
import io.grpc.NameResolverRegistry;
76
import io.grpc.ProxyDetector;
77
import io.grpc.Status;
78
import io.grpc.SynchronizationContext;
79
import io.grpc.SynchronizationContext.ScheduledHandle;
80
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
81
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
82
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
83
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
84
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
85
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
86
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
87
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
88
import io.grpc.internal.RetriableStream.Throttle;
89
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
90
import java.net.URI;
91
import java.net.URISyntaxException;
92
import java.util.ArrayList;
93
import java.util.Collection;
94
import java.util.Collections;
95
import java.util.HashSet;
96
import java.util.LinkedHashSet;
97
import java.util.List;
98
import java.util.Map;
99
import java.util.Set;
100
import java.util.concurrent.Callable;
101
import java.util.concurrent.CountDownLatch;
102
import java.util.concurrent.ExecutionException;
103
import java.util.concurrent.Executor;
104
import java.util.concurrent.Future;
105
import java.util.concurrent.ScheduledExecutorService;
106
import java.util.concurrent.ScheduledFuture;
107
import java.util.concurrent.TimeUnit;
108
import java.util.concurrent.TimeoutException;
109
import java.util.concurrent.atomic.AtomicBoolean;
110
import java.util.concurrent.atomic.AtomicReference;
111
import java.util.logging.Level;
112
import java.util.logging.Logger;
113
import java.util.regex.Pattern;
114
import javax.annotation.Nullable;
115
import javax.annotation.concurrent.GuardedBy;
116
import javax.annotation.concurrent.ThreadSafe;
117

118
/** A communication channel for making outgoing RPCs. */
119
@ThreadSafe
120
final class ManagedChannelImpl extends ManagedChannel implements
121
    InternalInstrumented<ChannelStats> {
122
  @VisibleForTesting
123
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
124

125
  // Matching this pattern means the target string is a URI target or at least intended to be one.
126
  // A URI target must be an absolute hierarchical URI.
127
  // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
128
  @VisibleForTesting
129
  static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
1✔
130

131
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
132

133
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
134

135
  @VisibleForTesting
136
  static final Status SHUTDOWN_NOW_STATUS =
1✔
137
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
138

139
  @VisibleForTesting
140
  static final Status SHUTDOWN_STATUS =
1✔
141
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
142

143
  @VisibleForTesting
144
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
145
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
146

147
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
148
      ManagedChannelServiceConfig.empty();
1✔
149
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
150
      new InternalConfigSelector() {
1✔
151
        @Override
152
        public Result selectConfig(PickSubchannelArgs args) {
153
          throw new IllegalStateException("Resolution is pending");
×
154
        }
155
      };
156

157
  private final InternalLogId logId;
158
  private final String target;
159
  @Nullable
160
  private final String authorityOverride;
161
  private final NameResolverRegistry nameResolverRegistry;
162
  private final NameResolver.Factory nameResolverFactory;
163
  private final NameResolver.Args nameResolverArgs;
164
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
165
  private final ClientTransportFactory originalTransportFactory;
166
  @Nullable
167
  private final ChannelCredentials originalChannelCreds;
168
  private final ClientTransportFactory transportFactory;
169
  private final ClientTransportFactory oobTransportFactory;
170
  private final RestrictedScheduledExecutor scheduledExecutor;
171
  private final Executor executor;
172
  private final ObjectPool<? extends Executor> executorPool;
173
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
174
  private final ExecutorHolder balancerRpcExecutorHolder;
175
  private final ExecutorHolder offloadExecutorHolder;
176
  private final TimeProvider timeProvider;
177
  private final int maxTraceEvents;
178

179
  @VisibleForTesting
1✔
180
  final SynchronizationContext syncContext = new SynchronizationContext(
181
      new Thread.UncaughtExceptionHandler() {
1✔
182
        @Override
183
        public void uncaughtException(Thread t, Throwable e) {
184
          logger.log(
1✔
185
              Level.SEVERE,
186
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
187
              e);
188
          panic(e);
1✔
189
        }
1✔
190
      });
191

192
  private boolean fullStreamDecompression;
193

194
  private final DecompressorRegistry decompressorRegistry;
195
  private final CompressorRegistry compressorRegistry;
196

197
  private final Supplier<Stopwatch> stopwatchSupplier;
198
  /** The timout before entering idle mode. */
199
  private final long idleTimeoutMillis;
200

201
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
202
  private final BackoffPolicy.Provider backoffPolicyProvider;
203

204
  /**
205
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
206
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
207
   * {@link RealChannel}.
208
   */
209
  private final Channel interceptorChannel;
210
  @Nullable private final String userAgent;
211

212
  // Only null after channel is terminated. Must be assigned from the syncContext.
213
  private NameResolver nameResolver;
214

215
  // Must be accessed from the syncContext.
216
  private boolean nameResolverStarted;
217

218
  // null when channel is in idle mode.  Must be assigned from syncContext.
219
  @Nullable
220
  private LbHelperImpl lbHelper;
221

222
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
223
  // null if channel is in idle mode.
224
  @Nullable
225
  private volatile SubchannelPicker subchannelPicker;
226

227
  // Must be accessed from the syncContext
228
  private boolean panicMode;
229

230
  // Must be mutated from syncContext
231
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
232
  // switch to a ConcurrentHashMap.
233
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
234

235
  // Must be accessed from syncContext
236
  @Nullable
237
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
238
  private final Object pendingCallsInUseObject = new Object();
1✔
239

240
  // Must be mutated from syncContext
241
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
242

243
  // reprocess() must be run from syncContext
244
  private final DelayedClientTransport delayedTransport;
245
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
246
      = new UncommittedRetriableStreamsRegistry();
247

248
  // Shutdown states.
249
  //
250
  // Channel's shutdown process:
251
  // 1. shutdown(): stop accepting new calls from applications
252
  //   1a shutdown <- true
253
  //   1b subchannelPicker <- null
254
  //   1c delayedTransport.shutdown()
255
  // 2. delayedTransport terminated: stop stream-creation functionality
256
  //   2a terminating <- true
257
  //   2b loadBalancer.shutdown()
258
  //     * LoadBalancer will shutdown subchannels and OOB channels
259
  //   2c loadBalancer <- null
260
  //   2d nameResolver.shutdown()
261
  //   2e nameResolver <- null
262
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
263

264
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
265
  // Must only be mutated and read from syncContext
266
  private boolean shutdownNowed;
267
  // Must only be mutated from syncContext
268
  private boolean terminating;
269
  // Must be mutated from syncContext
270
  private volatile boolean terminated;
271
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
272

273
  private final CallTracer.Factory callTracerFactory;
274
  private final CallTracer channelCallTracer;
275
  private final ChannelTracer channelTracer;
276
  private final ChannelLogger channelLogger;
277
  private final InternalChannelz channelz;
278
  private final RealChannel realChannel;
279
  // Must be mutated and read from syncContext
280
  // a flag for doing channel tracing when flipped
281
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
282
  // Must be mutated and read from constructor or syncContext
283
  // used for channel tracing when value changed
284
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
285

286
  @Nullable
287
  private final ManagedChannelServiceConfig defaultServiceConfig;
288
  // Must be mutated and read from constructor or syncContext
289
  private boolean serviceConfigUpdated = false;
1✔
290
  private final boolean lookUpServiceConfig;
291

292
  // One instance per channel.
293
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
294

295
  private final long perRpcBufferLimit;
296
  private final long channelBufferLimit;
297

298
  // Temporary false flag that can skip the retry code path.
299
  private final boolean retryEnabled;
300

301
  // Called from syncContext
302
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
303
      new DelayedTransportListener();
304

305
  // Must be called from syncContext
306
  private void maybeShutdownNowSubchannels() {
307
    if (shutdownNowed) {
1✔
308
      for (InternalSubchannel subchannel : subchannels) {
1✔
309
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
310
      }
1✔
311
      for (OobChannel oobChannel : oobChannels) {
1✔
312
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
313
      }
1✔
314
    }
315
  }
1✔
316

317
  // Must be accessed from syncContext
318
  @VisibleForTesting
1✔
319
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
320

321
  @Override
322
  public ListenableFuture<ChannelStats> getStats() {
323
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
324
    final class StatsFetcher implements Runnable {
1✔
325
      @Override
326
      public void run() {
327
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
328
        channelCallTracer.updateBuilder(builder);
1✔
329
        channelTracer.updateBuilder(builder);
1✔
330
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
331
        List<InternalWithLogId> children = new ArrayList<>();
1✔
332
        children.addAll(subchannels);
1✔
333
        children.addAll(oobChannels);
1✔
334
        builder.setSubchannels(children);
1✔
335
        ret.set(builder.build());
1✔
336
      }
1✔
337
    }
338

339
    // subchannels and oobchannels can only be accessed from syncContext
340
    syncContext.execute(new StatsFetcher());
1✔
341
    return ret;
1✔
342
  }
343

344
  @Override
345
  public InternalLogId getLogId() {
346
    return logId;
1✔
347
  }
348

349
  // Run from syncContext
350
  private class IdleModeTimer implements Runnable {
1✔
351

352
    @Override
353
    public void run() {
354
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
355
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
356
      // subtle to change rapidly to resolve the channel panic. See #8714
357
      if (lbHelper == null) {
1✔
358
        return;
×
359
      }
360
      enterIdleMode();
1✔
361
    }
1✔
362
  }
363

364
  // Must be called from syncContext
365
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
366
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
367
    if (channelIsActive) {
1✔
368
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
369
      checkState(lbHelper != null, "lbHelper is null");
1✔
370
    }
371
    if (nameResolver != null) {
1✔
372
      nameResolver.shutdown();
1✔
373
      nameResolverStarted = false;
1✔
374
      if (channelIsActive) {
1✔
375
        nameResolver = getNameResolver(
1✔
376
            target, authorityOverride, nameResolverFactory, nameResolverArgs);
377
      } else {
378
        nameResolver = null;
1✔
379
      }
380
    }
381
    if (lbHelper != null) {
1✔
382
      lbHelper.lb.shutdown();
1✔
383
      lbHelper = null;
1✔
384
    }
385
    subchannelPicker = null;
1✔
386
  }
1✔
387

388
  /**
389
   * Make the channel exit idle mode, if it's in it.
390
   *
391
   * <p>Must be called from syncContext
392
   */
393
  @VisibleForTesting
394
  void exitIdleMode() {
395
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
396
    if (shutdown.get() || panicMode) {
1✔
397
      return;
1✔
398
    }
399
    if (inUseStateAggregator.isInUse()) {
1✔
400
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
401
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
402
      cancelIdleTimer(false);
1✔
403
    } else {
404
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
405
      // isInUse() == false, in which case we still need to schedule the timer.
406
      rescheduleIdleTimer();
1✔
407
    }
408
    if (lbHelper != null) {
1✔
409
      return;
1✔
410
    }
411
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
412
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
413
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
414
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
415
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
416
    this.lbHelper = lbHelper;
1✔
417

418
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
419
    nameResolver.start(listener);
1✔
420
    nameResolverStarted = true;
1✔
421
  }
1✔
422

423
  // Must be run from syncContext
424
  private void enterIdleMode() {
425
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
426
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
427
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
428
    // which are bugs.
429
    shutdownNameResolverAndLoadBalancer(true);
1✔
430
    delayedTransport.reprocess(null);
1✔
431
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
432
    channelStateManager.gotoState(IDLE);
1✔
433
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
434
    // transport to be holding some we need to exit idle mode to give these calls a chance to
435
    // be processed.
436
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
437
      exitIdleMode();
1✔
438
    }
439
  }
1✔
440

441
  // Must be run from syncContext
442
  private void cancelIdleTimer(boolean permanent) {
443
    idleTimer.cancel(permanent);
1✔
444
  }
1✔
445

446
  // Always run from syncContext
447
  private void rescheduleIdleTimer() {
448
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
449
      return;
1✔
450
    }
451
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
452
  }
1✔
453

454
  /**
455
   * Force name resolution refresh to happen immediately. Must be run
456
   * from syncContext.
457
   */
458
  private void refreshNameResolution() {
459
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
460
    if (nameResolverStarted) {
1✔
461
      nameResolver.refresh();
1✔
462
    }
463
  }
1✔
464

465
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
466
    volatile Throttle throttle;
467

468
    private ClientTransport getTransport(PickSubchannelArgs args) {
469
      SubchannelPicker pickerCopy = subchannelPicker;
1✔
470
      if (shutdown.get()) {
1✔
471
        // If channel is shut down, delayedTransport is also shut down which will fail the stream
472
        // properly.
473
        return delayedTransport;
1✔
474
      }
475
      if (pickerCopy == null) {
1✔
476
        final class ExitIdleModeForTransport implements Runnable {
1✔
477
          @Override
478
          public void run() {
479
            exitIdleMode();
1✔
480
          }
1✔
481
        }
482

483
        syncContext.execute(new ExitIdleModeForTransport());
1✔
484
        return delayedTransport;
1✔
485
      }
486
      // There is no need to reschedule the idle timer here.
487
      //
488
      // pickerCopy != null, which means idle timer has not expired when this method starts.
489
      // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
490
      // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
491
      // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
492
      //
493
      // In most cases the idle timer is scheduled to fire after the transport has created the
494
      // stream, which would have reported in-use state to the channel that would have cancelled
495
      // the idle timer.
496
      PickResult pickResult = pickerCopy.pickSubchannel(args);
1✔
497
      ClientTransport transport = GrpcUtil.getTransportFromPickResult(
1✔
498
          pickResult, args.getCallOptions().isWaitForReady());
1✔
499
      if (transport != null) {
1✔
500
        return transport;
1✔
501
      }
502
      return delayedTransport;
1✔
503
    }
504

505
    @Override
506
    public ClientStream newStream(
507
        final MethodDescriptor<?, ?> method,
508
        final CallOptions callOptions,
509
        final Metadata headers,
510
        final Context context) {
511
      if (!retryEnabled) {
1✔
512
        ClientTransport transport =
1✔
513
            getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
1✔
514
        Context origContext = context.attach();
1✔
515
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
516
            callOptions, headers, 0, /* isTransparentRetry= */ false);
517
        try {
518
          return transport.newStream(method, headers, callOptions, tracers);
1✔
519
        } finally {
520
          context.detach(origContext);
1✔
521
        }
522
      } else {
523
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
524
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
525
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
526
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
527
          @SuppressWarnings("unchecked")
528
          RetryStream() {
1✔
529
            super(
1✔
530
                (MethodDescriptor<ReqT, ?>) method,
531
                headers,
532
                channelBufferUsed,
1✔
533
                perRpcBufferLimit,
1✔
534
                channelBufferLimit,
1✔
535
                getCallExecutor(callOptions),
1✔
536
                transportFactory.getScheduledExecutorService(),
1✔
537
                retryPolicy,
538
                hedgingPolicy,
539
                throttle);
540
          }
1✔
541

542
          @Override
543
          Status prestart() {
544
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
545
          }
546

547
          @Override
548
          void postCommit() {
549
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
550
          }
1✔
551

552
          @Override
553
          ClientStream newSubstream(
554
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
555
              boolean isTransparentRetry) {
556
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
557
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
558
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
559
            ClientTransport transport =
1✔
560
                getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
1✔
561
            Context origContext = context.attach();
1✔
562
            try {
563
              return transport.newStream(method, newHeaders, newOptions, tracers);
1✔
564
            } finally {
565
              context.detach(origContext);
1✔
566
            }
567
          }
568
        }
569

570
        return new RetryStream<>();
1✔
571
      }
572
    }
573
  }
574

575
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
576

577
  private final Rescheduler idleTimer;
578

579
  ManagedChannelImpl(
580
      ManagedChannelImplBuilder builder,
581
      ClientTransportFactory clientTransportFactory,
582
      BackoffPolicy.Provider backoffPolicyProvider,
583
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
584
      Supplier<Stopwatch> stopwatchSupplier,
585
      List<ClientInterceptor> interceptors,
586
      final TimeProvider timeProvider) {
1✔
587
    this.target = checkNotNull(builder.target, "target");
1✔
588
    this.logId = InternalLogId.allocate("Channel", target);
1✔
589
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
590
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
591
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
592
    this.originalChannelCreds = builder.channelCredentials;
1✔
593
    this.originalTransportFactory = clientTransportFactory;
1✔
594
    this.offloadExecutorHolder =
1✔
595
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
596
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
597
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
598
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
599
        clientTransportFactory, null, this.offloadExecutorHolder);
600
    this.scheduledExecutor =
1✔
601
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
602
    maxTraceEvents = builder.maxTraceEvents;
1✔
603
    channelTracer = new ChannelTracer(
1✔
604
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
605
        "Channel for '" + target + "'");
606
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
607
    ProxyDetector proxyDetector =
608
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
609
    this.retryEnabled = builder.retryEnabled;
1✔
610
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
611
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
612
    ScParser serviceConfigParser =
1✔
613
        new ScParser(
614
            retryEnabled,
615
            builder.maxRetryAttempts,
616
            builder.maxHedgedAttempts,
617
            loadBalancerFactory);
618
    this.authorityOverride = builder.authorityOverride;
1✔
619
    this.nameResolverArgs =
1✔
620
        NameResolver.Args.newBuilder()
1✔
621
            .setDefaultPort(builder.getDefaultPort())
1✔
622
            .setProxyDetector(proxyDetector)
1✔
623
            .setSynchronizationContext(syncContext)
1✔
624
            .setScheduledExecutorService(scheduledExecutor)
1✔
625
            .setServiceConfigParser(serviceConfigParser)
1✔
626
            .setChannelLogger(channelLogger)
1✔
627
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
628
            .setOverrideAuthority(this.authorityOverride)
1✔
629
            .build();
1✔
630
    this.nameResolverFactory = builder.nameResolverFactory;
1✔
631
    this.nameResolver = getNameResolver(
1✔
632
        target, authorityOverride, nameResolverFactory, nameResolverArgs);
633
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
634
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
635
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
636
    this.delayedTransport.start(delayedTransportListener);
1✔
637
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
638

639
    if (builder.defaultServiceConfig != null) {
1✔
640
      ConfigOrError parsedDefaultServiceConfig =
1✔
641
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
642
      checkState(
1✔
643
          parsedDefaultServiceConfig.getError() == null,
1✔
644
          "Default config is invalid: %s",
645
          parsedDefaultServiceConfig.getError());
1✔
646
      this.defaultServiceConfig =
1✔
647
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
648
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
649
    } else {
1✔
650
      this.defaultServiceConfig = null;
1✔
651
    }
652
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
653
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
654
    Channel channel = realChannel;
1✔
655
    if (builder.binlog != null) {
1✔
656
      channel = builder.binlog.wrapChannel(channel);
1✔
657
    }
658
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
659
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
660
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
661
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
662
    } else {
663
      checkArgument(
1✔
664
          builder.idleTimeoutMillis
665
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
666
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
667
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
668
    }
669

670
    idleTimer = new Rescheduler(
1✔
671
        new IdleModeTimer(),
672
        syncContext,
673
        transportFactory.getScheduledExecutorService(),
1✔
674
        stopwatchSupplier.get());
1✔
675
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
676
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
677
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
678
    this.userAgent = builder.userAgent;
1✔
679

680
    this.channelBufferLimit = builder.retryBufferSize;
1✔
681
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
682
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
683
      @Override
684
      public CallTracer create() {
685
        return new CallTracer(timeProvider);
1✔
686
      }
687
    }
688

689
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
690
    channelCallTracer = callTracerFactory.create();
1✔
691
    this.channelz = checkNotNull(builder.channelz);
1✔
692
    channelz.addRootChannel(this);
1✔
693

694
    if (!lookUpServiceConfig) {
1✔
695
      if (defaultServiceConfig != null) {
1✔
696
        channelLogger.log(
1✔
697
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
698
      }
699
      serviceConfigUpdated = true;
1✔
700
    }
701
  }
1✔
702

703
  private static NameResolver getNameResolver(
704
      String target, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) {
705
    // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
706
    // "dns:///".
707
    URI targetUri = null;
1✔
708
    StringBuilder uriSyntaxErrors = new StringBuilder();
1✔
709
    try {
710
      targetUri = new URI(target);
1✔
711
      // For "localhost:8080" this would likely cause newNameResolver to return null, because
712
      // "localhost" is parsed as the scheme. Will fall into the next branch and try
713
      // "dns:///localhost:8080".
714
    } catch (URISyntaxException e) {
1✔
715
      // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
716
      uriSyntaxErrors.append(e.getMessage());
1✔
717
    }
1✔
718
    if (targetUri != null) {
1✔
719
      NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
1✔
720
      if (resolver != null) {
1✔
721
        return resolver;
1✔
722
      }
723
      // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an
724
      // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080"
725
    }
726

727
    // If we reached here, the targetUri couldn't be used.
728
    if (!URI_PATTERN.matcher(target).matches()) {
1✔
729
      // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
730
      // scheme from the factory.
731
      try {
732
        targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
1✔
733
      } catch (URISyntaxException e) {
×
734
        // Should not be possible.
735
        throw new IllegalArgumentException(e);
×
736
      }
1✔
737
      NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
1✔
738
      if (resolver != null) {
1✔
739
        return resolver;
1✔
740
      }
741
    }
742
    throw new IllegalArgumentException(String.format(
1✔
743
        "cannot find a NameResolver for %s%s",
744
        target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
1✔
745
  }
746

747
  @VisibleForTesting
748
  static NameResolver getNameResolver(
749
      String target, @Nullable final String overrideAuthority,
750
      NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) {
751
    NameResolver resolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
1✔
752

753
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
754
    // TODO: After a transition period, all NameResolver implementations that need retry should use
755
    //       RetryingNameResolver directly and this step can be removed.
756
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
757
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
758
              nameResolverArgs.getScheduledExecutorService(),
1✔
759
              nameResolverArgs.getSynchronizationContext()),
1✔
760
          nameResolverArgs.getSynchronizationContext());
1✔
761

762
    if (overrideAuthority == null) {
1✔
763
      return usedNameResolver;
1✔
764
    }
765

766
    return new ForwardingNameResolver(usedNameResolver) {
1✔
767
      @Override
768
      public String getServiceAuthority() {
769
        return overrideAuthority;
1✔
770
      }
771
    };
772
  }
773

774
  @VisibleForTesting
775
  InternalConfigSelector getConfigSelector() {
776
    return realChannel.configSelector.get();
1✔
777
  }
778

779
  /**
780
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
781
   * cancelled.
782
   */
783
  @Override
784
  public ManagedChannelImpl shutdown() {
785
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
786
    if (!shutdown.compareAndSet(false, true)) {
1✔
787
      return this;
1✔
788
    }
789
    final class Shutdown implements Runnable {
1✔
790
      @Override
791
      public void run() {
792
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
793
        channelStateManager.gotoState(SHUTDOWN);
1✔
794
      }
1✔
795
    }
796

797
    syncContext.execute(new Shutdown());
1✔
798
    realChannel.shutdown();
1✔
799
    final class CancelIdleTimer implements Runnable {
1✔
800
      @Override
801
      public void run() {
802
        cancelIdleTimer(/* permanent= */ true);
1✔
803
      }
1✔
804
    }
805

806
    syncContext.execute(new CancelIdleTimer());
1✔
807
    return this;
1✔
808
  }
809

810
  /**
811
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
812
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
813
   * return {@code false} immediately after this method returns.
814
   */
815
  @Override
816
  public ManagedChannelImpl shutdownNow() {
817
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
818
    shutdown();
1✔
819
    realChannel.shutdownNow();
1✔
820
    final class ShutdownNow implements Runnable {
1✔
821
      @Override
822
      public void run() {
823
        if (shutdownNowed) {
1✔
824
          return;
1✔
825
        }
826
        shutdownNowed = true;
1✔
827
        maybeShutdownNowSubchannels();
1✔
828
      }
1✔
829
    }
830

831
    syncContext.execute(new ShutdownNow());
1✔
832
    return this;
1✔
833
  }
834

835
  // Called from syncContext
836
  @VisibleForTesting
837
  void panic(final Throwable t) {
838
    if (panicMode) {
1✔
839
      // Preserve the first panic information
840
      return;
×
841
    }
842
    panicMode = true;
1✔
843
    cancelIdleTimer(/* permanent= */ true);
1✔
844
    shutdownNameResolverAndLoadBalancer(false);
1✔
845
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
846
      private final PickResult panicPickResult =
1✔
847
          PickResult.withDrop(
1✔
848
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
849

850
      @Override
851
      public PickResult pickSubchannel(PickSubchannelArgs args) {
852
        return panicPickResult;
1✔
853
      }
854

855
      @Override
856
      public String toString() {
857
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
858
            .add("panicPickResult", panicPickResult)
×
859
            .toString();
×
860
      }
861
    }
862

863
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
864
    realChannel.updateConfigSelector(null);
1✔
865
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
866
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
867
  }
1✔
868

869
  @VisibleForTesting
870
  boolean isInPanicMode() {
871
    return panicMode;
1✔
872
  }
873

874
  // Called from syncContext
875
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
876
    subchannelPicker = newPicker;
1✔
877
    delayedTransport.reprocess(newPicker);
1✔
878
  }
1✔
879

880
  @Override
881
  public boolean isShutdown() {
882
    return shutdown.get();
1✔
883
  }
884

885
  @Override
886
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
887
    return terminatedLatch.await(timeout, unit);
1✔
888
  }
889

890
  @Override
891
  public boolean isTerminated() {
892
    return terminated;
1✔
893
  }
894

895
  /*
896
   * Creates a new outgoing call on the channel.
897
   */
898
  @Override
899
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
900
      CallOptions callOptions) {
901
    return interceptorChannel.newCall(method, callOptions);
1✔
902
  }
903

904
  @Override
905
  public String authority() {
906
    return interceptorChannel.authority();
1✔
907
  }
908

909
  private Executor getCallExecutor(CallOptions callOptions) {
910
    Executor executor = callOptions.getExecutor();
1✔
911
    if (executor == null) {
1✔
912
      executor = this.executor;
1✔
913
    }
914
    return executor;
1✔
915
  }
916

917
  private class RealChannel extends Channel {
918
    // Reference to null if no config selector is available from resolution result
919
    // Reference must be set() from syncContext
920
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
921
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
922
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
923
    // same target, the new instance must have the same value.
924
    private final String authority;
925

926
    private final Channel clientCallImplChannel = new Channel() {
1✔
927
      @Override
928
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
929
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
930
        return new ClientCallImpl<>(
1✔
931
            method,
932
            getCallExecutor(callOptions),
1✔
933
            callOptions,
934
            transportProvider,
1✔
935
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
936
            channelCallTracer,
1✔
937
            null)
938
            .setFullStreamDecompression(fullStreamDecompression)
1✔
939
            .setDecompressorRegistry(decompressorRegistry)
1✔
940
            .setCompressorRegistry(compressorRegistry);
1✔
941
      }
942

943
      @Override
944
      public String authority() {
945
        return authority;
×
946
      }
947
    };
948

949
    private RealChannel(String authority) {
1✔
950
      this.authority =  checkNotNull(authority, "authority");
1✔
951
    }
1✔
952

953
    @Override
954
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
955
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
956
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
957
        return newClientCall(method, callOptions);
1✔
958
      }
959
      syncContext.execute(new Runnable() {
1✔
960
        @Override
961
        public void run() {
962
          exitIdleMode();
1✔
963
        }
1✔
964
      });
965
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
966
        // This is an optimization for the case (typically with InProcessTransport) when name
967
        // resolution result is immediately available at this point. Otherwise, some users'
968
        // tests might observe slight behavior difference from earlier grpc versions.
969
        return newClientCall(method, callOptions);
1✔
970
      }
971
      if (shutdown.get()) {
1✔
972
        // Return a failing ClientCall.
973
        return new ClientCall<ReqT, RespT>() {
×
974
          @Override
975
          public void start(Listener<RespT> responseListener, Metadata headers) {
976
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
977
          }
×
978

979
          @Override public void request(int numMessages) {}
×
980

981
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
982

983
          @Override public void halfClose() {}
×
984

985
          @Override public void sendMessage(ReqT message) {}
×
986
        };
987
      }
988
      Context context = Context.current();
1✔
989
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
990
      syncContext.execute(new Runnable() {
1✔
991
        @Override
992
        public void run() {
993
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
994
            if (pendingCalls == null) {
1✔
995
              pendingCalls = new LinkedHashSet<>();
1✔
996
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
997
            }
998
            pendingCalls.add(pendingCall);
1✔
999
          } else {
1000
            pendingCall.reprocess();
1✔
1001
          }
1002
        }
1✔
1003
      });
1004
      return pendingCall;
1✔
1005
    }
1006

1007
    // Must run in SynchronizationContext.
1008
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
1009
      InternalConfigSelector prevConfig = configSelector.get();
1✔
1010
      configSelector.set(config);
1✔
1011
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
1012
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1013
          pendingCall.reprocess();
1✔
1014
        }
1✔
1015
      }
1016
    }
1✔
1017

1018
    // Must run in SynchronizationContext.
1019
    void onConfigError() {
1020
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1021
        updateConfigSelector(null);
1✔
1022
      }
1023
    }
1✔
1024

1025
    void shutdown() {
1026
      final class RealChannelShutdown implements Runnable {
1✔
1027
        @Override
1028
        public void run() {
1029
          if (pendingCalls == null) {
1✔
1030
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1031
              configSelector.set(null);
1✔
1032
            }
1033
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1034
          }
1035
        }
1✔
1036
      }
1037

1038
      syncContext.execute(new RealChannelShutdown());
1✔
1039
    }
1✔
1040

1041
    void shutdownNow() {
1042
      final class RealChannelShutdownNow implements Runnable {
1✔
1043
        @Override
1044
        public void run() {
1045
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1046
            configSelector.set(null);
1✔
1047
          }
1048
          if (pendingCalls != null) {
1✔
1049
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1050
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
1051
            }
1✔
1052
          }
1053
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
1054
        }
1✔
1055
      }
1056

1057
      syncContext.execute(new RealChannelShutdownNow());
1✔
1058
    }
1✔
1059

1060
    @Override
1061
    public String authority() {
1062
      return authority;
1✔
1063
    }
1064

1065
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1066
      final Context context;
1067
      final MethodDescriptor<ReqT, RespT> method;
1068
      final CallOptions callOptions;
1069

1070
      PendingCall(
1071
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1072
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1073
        this.context = context;
1✔
1074
        this.method = method;
1✔
1075
        this.callOptions = callOptions;
1✔
1076
      }
1✔
1077

1078
      /** Called when it's ready to create a real call and reprocess the pending call. */
1079
      void reprocess() {
1080
        ClientCall<ReqT, RespT> realCall;
1081
        Context previous = context.attach();
1✔
1082
        try {
1083
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED, true);
1✔
1084
          realCall = newClientCall(method, delayResolutionOption);
1✔
1085
        } finally {
1086
          context.detach(previous);
1✔
1087
        }
1088
        Runnable toRun = setCall(realCall);
1✔
1089
        if (toRun == null) {
1✔
1090
          syncContext.execute(new PendingCallRemoval());
1✔
1091
        } else {
1092
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1093
            @Override
1094
            public void run() {
1095
              toRun.run();
1✔
1096
              syncContext.execute(new PendingCallRemoval());
1✔
1097
            }
1✔
1098
          });
1099
        }
1100
      }
1✔
1101

1102
      @Override
1103
      protected void callCancelled() {
1104
        super.callCancelled();
1✔
1105
        syncContext.execute(new PendingCallRemoval());
1✔
1106
      }
1✔
1107

1108
      final class PendingCallRemoval implements Runnable {
1✔
1109
        @Override
1110
        public void run() {
1111
          if (pendingCalls != null) {
1✔
1112
            pendingCalls.remove(PendingCall.this);
1✔
1113
            if (pendingCalls.isEmpty()) {
1✔
1114
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1115
              pendingCalls = null;
1✔
1116
              if (shutdown.get()) {
1✔
1117
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1118
              }
1119
            }
1120
          }
1121
        }
1✔
1122
      }
1123
    }
1124

1125
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1126
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1127
      InternalConfigSelector selector = configSelector.get();
1✔
1128
      if (selector == null) {
1✔
1129
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1130
      }
1131
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1132
        MethodInfo methodInfo =
1✔
1133
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1134
        if (methodInfo != null) {
1✔
1135
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1136
        }
1137
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1138
      }
1139
      return new ConfigSelectingClientCall<>(
1✔
1140
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1141
    }
1142
  }
1143

1144
  /**
1145
   * A client call for a given channel that applies a given config selector when it starts.
1146
   */
1147
  static final class ConfigSelectingClientCall<ReqT, RespT>
1148
      extends ForwardingClientCall<ReqT, RespT> {
1149

1150
    private final InternalConfigSelector configSelector;
1151
    private final Channel channel;
1152
    private final Executor callExecutor;
1153
    private final MethodDescriptor<ReqT, RespT> method;
1154
    private final Context context;
1155
    private CallOptions callOptions;
1156

1157
    private ClientCall<ReqT, RespT> delegate;
1158

1159
    ConfigSelectingClientCall(
1160
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1161
        MethodDescriptor<ReqT, RespT> method,
1162
        CallOptions callOptions) {
1✔
1163
      this.configSelector = configSelector;
1✔
1164
      this.channel = channel;
1✔
1165
      this.method = method;
1✔
1166
      this.callExecutor =
1✔
1167
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1168
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1169
      this.context = Context.current();
1✔
1170
    }
1✔
1171

1172
    @Override
1173
    protected ClientCall<ReqT, RespT> delegate() {
1174
      return delegate;
1✔
1175
    }
1176

1177
    @SuppressWarnings("unchecked")
1178
    @Override
1179
    public void start(Listener<RespT> observer, Metadata headers) {
1180
      PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
1✔
1181
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1182
      Status status = result.getStatus();
1✔
1183
      if (!status.isOk()) {
1✔
1184
        executeCloseObserverInContext(observer,
1✔
1185
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1186
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1187
        return;
1✔
1188
      }
1189
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1190
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1191
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1192
      if (methodInfo != null) {
1✔
1193
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1194
      }
1195
      if (interceptor != null) {
1✔
1196
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1197
      } else {
1198
        delegate = channel.newCall(method, callOptions);
×
1199
      }
1200
      delegate.start(observer, headers);
1✔
1201
    }
1✔
1202

1203
    private void executeCloseObserverInContext(
1204
        final Listener<RespT> observer, final Status status) {
1205
      class CloseInContext extends ContextRunnable {
1206
        CloseInContext() {
1✔
1207
          super(context);
1✔
1208
        }
1✔
1209

1210
        @Override
1211
        public void runInContext() {
1212
          observer.onClose(status, new Metadata());
1✔
1213
        }
1✔
1214
      }
1215

1216
      callExecutor.execute(new CloseInContext());
1✔
1217
    }
1✔
1218

1219
    @Override
1220
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1221
      if (delegate != null) {
×
1222
        delegate.cancel(message, cause);
×
1223
      }
1224
    }
×
1225
  }
1226

1227
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1228
    @Override
1229
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1230

1231
    @Override
1232
    public void request(int numMessages) {}
1✔
1233

1234
    @Override
1235
    public void cancel(String message, Throwable cause) {}
×
1236

1237
    @Override
1238
    public void halfClose() {}
×
1239

1240
    @Override
1241
    public void sendMessage(Object message) {}
×
1242

1243
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1244
    @Override
1245
    public boolean isReady() {
1246
      return false;
×
1247
    }
1248
  };
1249

1250
  /**
1251
   * Terminate the channel if termination conditions are met.
1252
   */
1253
  // Must be run from syncContext
1254
  private void maybeTerminateChannel() {
1255
    if (terminated) {
1✔
1256
      return;
×
1257
    }
1258
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1259
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1260
      channelz.removeRootChannel(this);
1✔
1261
      executorPool.returnObject(executor);
1✔
1262
      balancerRpcExecutorHolder.release();
1✔
1263
      offloadExecutorHolder.release();
1✔
1264
      // Release the transport factory so that it can deallocate any resources.
1265
      transportFactory.close();
1✔
1266

1267
      terminated = true;
1✔
1268
      terminatedLatch.countDown();
1✔
1269
    }
1270
  }
1✔
1271

1272
  // Must be called from syncContext
1273
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1274
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1275
      refreshNameResolution();
1✔
1276
    }
1277
  }
1✔
1278

1279
  @Override
1280
  @SuppressWarnings("deprecation")
1281
  public ConnectivityState getState(boolean requestConnection) {
1282
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1283
    if (requestConnection && savedChannelState == IDLE) {
1✔
1284
      final class RequestConnection implements Runnable {
1✔
1285
        @Override
1286
        public void run() {
1287
          exitIdleMode();
1✔
1288
          if (subchannelPicker != null) {
1✔
1289
            subchannelPicker.requestConnection();
1✔
1290
          }
1291
          if (lbHelper != null) {
1✔
1292
            lbHelper.lb.requestConnection();
1✔
1293
          }
1294
        }
1✔
1295
      }
1296

1297
      syncContext.execute(new RequestConnection());
1✔
1298
    }
1299
    return savedChannelState;
1✔
1300
  }
1301

1302
  @Override
1303
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1304
    final class NotifyStateChanged implements Runnable {
1✔
1305
      @Override
1306
      public void run() {
1307
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1308
      }
1✔
1309
    }
1310

1311
    syncContext.execute(new NotifyStateChanged());
1✔
1312
  }
1✔
1313

1314
  @Override
1315
  public void resetConnectBackoff() {
1316
    final class ResetConnectBackoff implements Runnable {
1✔
1317
      @Override
1318
      public void run() {
1319
        if (shutdown.get()) {
1✔
1320
          return;
1✔
1321
        }
1322
        if (nameResolverStarted) {
1✔
1323
          refreshNameResolution();
1✔
1324
        }
1325
        for (InternalSubchannel subchannel : subchannels) {
1✔
1326
          subchannel.resetConnectBackoff();
1✔
1327
        }
1✔
1328
        for (OobChannel oobChannel : oobChannels) {
1✔
1329
          oobChannel.resetConnectBackoff();
×
1330
        }
×
1331
      }
1✔
1332
    }
1333

1334
    syncContext.execute(new ResetConnectBackoff());
1✔
1335
  }
1✔
1336

1337
  @Override
1338
  public void enterIdle() {
1339
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1340
      @Override
1341
      public void run() {
1342
        if (shutdown.get() || lbHelper == null) {
1✔
1343
          return;
1✔
1344
        }
1345
        cancelIdleTimer(/* permanent= */ false);
1✔
1346
        enterIdleMode();
1✔
1347
      }
1✔
1348
    }
1349

1350
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1351
  }
1✔
1352

1353
  /**
1354
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1355
   * backoff.
1356
   */
1357
  private final class UncommittedRetriableStreamsRegistry {
1✔
1358
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1359
    // it's worthwhile to look for a lock-free approach.
1360
    final Object lock = new Object();
1✔
1361

1362
    @GuardedBy("lock")
1✔
1363
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1364

1365
    @GuardedBy("lock")
1366
    Status shutdownStatus;
1367

1368
    void onShutdown(Status reason) {
1369
      boolean shouldShutdownDelayedTransport = false;
1✔
1370
      synchronized (lock) {
1✔
1371
        if (shutdownStatus != null) {
1✔
1372
          return;
1✔
1373
        }
1374
        shutdownStatus = reason;
1✔
1375
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1376
        // retriable streams, which may be in backoff and not using any transport, are already
1377
        // started RPCs.
1378
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1379
          shouldShutdownDelayedTransport = true;
1✔
1380
        }
1381
      }
1✔
1382

1383
      if (shouldShutdownDelayedTransport) {
1✔
1384
        delayedTransport.shutdown(reason);
1✔
1385
      }
1386
    }
1✔
1387

1388
    void onShutdownNow(Status reason) {
1389
      onShutdown(reason);
1✔
1390
      Collection<ClientStream> streams;
1391

1392
      synchronized (lock) {
1✔
1393
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1394
      }
1✔
1395

1396
      for (ClientStream stream : streams) {
1✔
1397
        stream.cancel(reason);
×
1398
      }
×
1399
      delayedTransport.shutdownNow(reason);
1✔
1400
    }
1✔
1401

1402
    /**
1403
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1404
     * shutdown Status.
1405
     */
1406
    @Nullable
1407
    Status add(RetriableStream<?> retriableStream) {
1408
      synchronized (lock) {
1✔
1409
        if (shutdownStatus != null) {
1✔
1410
          return shutdownStatus;
1✔
1411
        }
1412
        uncommittedRetriableStreams.add(retriableStream);
1✔
1413
        return null;
1✔
1414
      }
1415
    }
1416

1417
    void remove(RetriableStream<?> retriableStream) {
1418
      Status shutdownStatusCopy = null;
1✔
1419

1420
      synchronized (lock) {
1✔
1421
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1422
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1423
          shutdownStatusCopy = shutdownStatus;
1✔
1424
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1425
          // hashmap.
1426
          uncommittedRetriableStreams = new HashSet<>();
1✔
1427
        }
1428
      }
1✔
1429

1430
      if (shutdownStatusCopy != null) {
1✔
1431
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1432
      }
1433
    }
1✔
1434
  }
1435

1436
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1437
    AutoConfiguredLoadBalancer lb;
1438

1439
    @Override
1440
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1441
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1442
      // No new subchannel should be created after load balancer has been shutdown.
1443
      checkState(!terminating, "Channel is being terminated");
1✔
1444
      return new SubchannelImpl(args);
1✔
1445
    }
1446

1447
    @Override
1448
    public void updateBalancingState(
1449
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1450
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1451
      checkNotNull(newState, "newState");
1✔
1452
      checkNotNull(newPicker, "newPicker");
1✔
1453
      final class UpdateBalancingState implements Runnable {
1✔
1454
        @Override
1455
        public void run() {
1456
          if (LbHelperImpl.this != lbHelper) {
1✔
1457
            return;
1✔
1458
          }
1459
          updateSubchannelPicker(newPicker);
1✔
1460
          // It's not appropriate to report SHUTDOWN state from lb.
1461
          // Ignore the case of newState == SHUTDOWN for now.
1462
          if (newState != SHUTDOWN) {
1✔
1463
            channelLogger.log(
1✔
1464
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1465
            channelStateManager.gotoState(newState);
1✔
1466
          }
1467
        }
1✔
1468
      }
1469

1470
      syncContext.execute(new UpdateBalancingState());
1✔
1471
    }
1✔
1472

1473
    @Override
1474
    public void refreshNameResolution() {
1475
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1476
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1477
        @Override
1478
        public void run() {
1479
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1480
        }
1✔
1481
      }
1482

1483
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1484
    }
1✔
1485

1486
    @Override
1487
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1488
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1489
    }
1490

1491
    @Override
1492
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1493
        String authority) {
1494
      // TODO(ejona): can we be even stricter? Like terminating?
1495
      checkState(!terminated, "Channel is terminated");
1✔
1496
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1497
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1498
      InternalLogId subchannelLogId =
1✔
1499
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1500
      ChannelTracer oobChannelTracer =
1✔
1501
          new ChannelTracer(
1502
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1503
              "OobChannel for " + addressGroup);
1504
      final OobChannel oobChannel = new OobChannel(
1✔
1505
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1506
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1507
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1508
          .setDescription("Child OobChannel created")
1✔
1509
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1510
          .setTimestampNanos(oobChannelCreationTime)
1✔
1511
          .setChannelRef(oobChannel)
1✔
1512
          .build());
1✔
1513
      ChannelTracer subchannelTracer =
1✔
1514
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1515
              "Subchannel for " + addressGroup);
1516
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1517
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1518
        @Override
1519
        void onTerminated(InternalSubchannel is) {
1520
          oobChannels.remove(oobChannel);
1✔
1521
          channelz.removeSubchannel(is);
1✔
1522
          oobChannel.handleSubchannelTerminated();
1✔
1523
          maybeTerminateChannel();
1✔
1524
        }
1✔
1525

1526
        @Override
1527
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1528
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1529
          //  state and refresh name resolution if necessary.
1530
          handleInternalSubchannelState(newState);
1✔
1531
          oobChannel.handleSubchannelStateChange(newState);
1✔
1532
        }
1✔
1533
      }
1534

1535
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1536
          addressGroup,
1537
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1538
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1539
          // All callback methods are run from syncContext
1540
          new ManagedOobChannelCallback(),
1541
          channelz,
1✔
1542
          callTracerFactory.create(),
1✔
1543
          subchannelTracer,
1544
          subchannelLogId,
1545
          subchannelLogger);
1546
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1547
          .setDescription("Child Subchannel created")
1✔
1548
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1549
          .setTimestampNanos(oobChannelCreationTime)
1✔
1550
          .setSubchannelRef(internalSubchannel)
1✔
1551
          .build());
1✔
1552
      channelz.addSubchannel(oobChannel);
1✔
1553
      channelz.addSubchannel(internalSubchannel);
1✔
1554
      oobChannel.setSubchannel(internalSubchannel);
1✔
1555
      final class AddOobChannel implements Runnable {
1✔
1556
        @Override
1557
        public void run() {
1558
          if (terminating) {
1✔
1559
            oobChannel.shutdown();
×
1560
          }
1561
          if (!terminated) {
1✔
1562
            // If channel has not terminated, it will track the subchannel and block termination
1563
            // for it.
1564
            oobChannels.add(oobChannel);
1✔
1565
          }
1566
        }
1✔
1567
      }
1568

1569
      syncContext.execute(new AddOobChannel());
1✔
1570
      return oobChannel;
1✔
1571
    }
1572

1573
    @Deprecated
1574
    @Override
1575
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1576
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1577
          // Override authority to keep the old behavior.
1578
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1579
          .overrideAuthority(getAuthority());
1✔
1580
    }
1581

1582
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1583
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1584
    @Override
1585
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1586
        final String target, final ChannelCredentials channelCreds) {
1587
      checkNotNull(channelCreds, "channelCreds");
1✔
1588

1589
      final class ResolvingOobChannelBuilder
1590
          extends ForwardingChannelBuilder<ResolvingOobChannelBuilder> {
1591
        final ManagedChannelBuilder<?> delegate;
1592

1593
        ResolvingOobChannelBuilder() {
1✔
1594
          final ClientTransportFactory transportFactory;
1595
          CallCredentials callCredentials;
1596
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1597
            transportFactory = originalTransportFactory;
1✔
1598
            callCredentials = null;
1✔
1599
          } else {
1600
            SwapChannelCredentialsResult swapResult =
1✔
1601
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1602
            if (swapResult == null) {
1✔
1603
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1604
              return;
×
1605
            } else {
1606
              transportFactory = swapResult.transportFactory;
1✔
1607
              callCredentials = swapResult.callCredentials;
1✔
1608
            }
1609
          }
1610
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1611
              new ClientTransportFactoryBuilder() {
1✔
1612
                @Override
1613
                public ClientTransportFactory buildClientTransportFactory() {
1614
                  return transportFactory;
1✔
1615
                }
1616
              };
1617
          delegate = new ManagedChannelImplBuilder(
1✔
1618
              target,
1619
              channelCreds,
1620
              callCredentials,
1621
              transportFactoryBuilder,
1622
              new FixedPortProvider(nameResolverArgs.getDefaultPort()));
1✔
1623
        }
1✔
1624

1625
        @Override
1626
        protected ManagedChannelBuilder<?> delegate() {
1627
          return delegate;
1✔
1628
        }
1629
      }
1630

1631
      checkState(!terminated, "Channel is terminated");
1✔
1632

1633
      @SuppressWarnings("deprecation")
1634
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder()
1✔
1635
          .nameResolverFactory(nameResolverFactory);
1✔
1636

1637
      return builder
1✔
1638
          // TODO(zdapeng): executors should not outlive the parent channel.
1639
          .executor(executor)
1✔
1640
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1641
          .maxTraceEvents(maxTraceEvents)
1✔
1642
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1643
          .userAgent(userAgent);
1✔
1644
    }
1645

1646
    @Override
1647
    public ChannelCredentials getUnsafeChannelCredentials() {
1648
      if (originalChannelCreds == null) {
×
1649
        return new DefaultChannelCreds();
×
1650
      }
1651
      return originalChannelCreds;
×
1652
    }
1653

1654
    @Override
1655
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1656
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1657
    }
×
1658

1659
    @Override
1660
    public void updateOobChannelAddresses(ManagedChannel channel,
1661
        List<EquivalentAddressGroup> eag) {
1662
      checkArgument(channel instanceof OobChannel,
1✔
1663
          "channel must have been returned from createOobChannel");
1664
      ((OobChannel) channel).updateAddresses(eag);
1✔
1665
    }
1✔
1666

1667
    @Override
1668
    public String getAuthority() {
1669
      return ManagedChannelImpl.this.authority();
1✔
1670
    }
1671

1672
    @Override
1673
    public SynchronizationContext getSynchronizationContext() {
1674
      return syncContext;
1✔
1675
    }
1676

1677
    @Override
1678
    public ScheduledExecutorService getScheduledExecutorService() {
1679
      return scheduledExecutor;
1✔
1680
    }
1681

1682
    @Override
1683
    public ChannelLogger getChannelLogger() {
1684
      return channelLogger;
1✔
1685
    }
1686

1687
    @Override
1688
    public NameResolver.Args getNameResolverArgs() {
1689
      return nameResolverArgs;
1✔
1690
    }
1691

1692
    @Override
1693
    public NameResolverRegistry getNameResolverRegistry() {
1694
      return nameResolverRegistry;
1✔
1695
    }
1696

1697
    /**
1698
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1699
     */
1700
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1701
    //     channel creds.
1702
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1703
      @Override
1704
      public ChannelCredentials withoutBearerTokens() {
1705
        return this;
×
1706
      }
1707
    }
1708
  }
1709

1710
  final class NameResolverListener extends NameResolver.Listener2 {
1711
    final LbHelperImpl helper;
1712
    final NameResolver resolver;
1713

1714
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1715
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1716
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1717
    }
1✔
1718

1719
    @Override
1720
    public void onResult(final ResolutionResult resolutionResult) {
1721
      final class NamesResolved implements Runnable {
1✔
1722

1723
        @SuppressWarnings("ReferenceEquality")
1724
        @Override
1725
        public void run() {
1726
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1727
            return;
1✔
1728
          }
1729

1730
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1731
          channelLogger.log(
1✔
1732
              ChannelLogLevel.DEBUG,
1733
              "Resolved address: {0}, config={1}",
1734
              servers,
1735
              resolutionResult.getAttributes());
1✔
1736

1737
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1738
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1739
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1740
          }
1741

1742
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1743
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1744
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1745
          InternalConfigSelector resolvedConfigSelector =
1✔
1746
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1747
          ManagedChannelServiceConfig validServiceConfig =
1748
              configOrError != null && configOrError.getConfig() != null
1✔
1749
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1750
                  : null;
1✔
1751
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1752

1753
          ManagedChannelServiceConfig effectiveServiceConfig;
1754
          if (!lookUpServiceConfig) {
1✔
1755
            if (validServiceConfig != null) {
1✔
1756
              channelLogger.log(
1✔
1757
                  ChannelLogLevel.INFO,
1758
                  "Service config from name resolver discarded by channel settings");
1759
            }
1760
            effectiveServiceConfig =
1761
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1762
            if (resolvedConfigSelector != null) {
1✔
1763
              channelLogger.log(
1✔
1764
                  ChannelLogLevel.INFO,
1765
                  "Config selector from name resolver discarded by channel settings");
1766
            }
1767
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1768
          } else {
1769
            // Try to use config if returned from name resolver
1770
            // Otherwise, try to use the default config if available
1771
            if (validServiceConfig != null) {
1✔
1772
              effectiveServiceConfig = validServiceConfig;
1✔
1773
              if (resolvedConfigSelector != null) {
1✔
1774
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1775
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1776
                  channelLogger.log(
×
1777
                      ChannelLogLevel.DEBUG,
1778
                      "Method configs in service config will be discarded due to presence of"
1779
                          + "config-selector");
1780
                }
1781
              } else {
1782
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1783
              }
1784
            } else if (defaultServiceConfig != null) {
1✔
1785
              effectiveServiceConfig = defaultServiceConfig;
1✔
1786
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1787
              channelLogger.log(
1✔
1788
                  ChannelLogLevel.INFO,
1789
                  "Received no service config, using default service config");
1790
            } else if (serviceConfigError != null) {
1✔
1791
              if (!serviceConfigUpdated) {
1✔
1792
                // First DNS lookup has invalid service config, and cannot fall back to default
1793
                channelLogger.log(
1✔
1794
                    ChannelLogLevel.INFO,
1795
                    "Fallback to error due to invalid first service config without default config");
1796
                // This error could be an "inappropriate" control plane error that should not bleed
1797
                // through to client code using gRPC. We let them flow through here to the LB as
1798
                // we later check for these error codes when investigating pick results in
1799
                // GrpcUtil.getTransportFromPickResult().
1800
                onError(configOrError.getError());
1✔
1801
                if (resolutionResultListener != null) {
1✔
1802
                  resolutionResultListener.resolutionAttempted(false);
1✔
1803
                }
1804
                return;
1✔
1805
              } else {
1806
                effectiveServiceConfig = lastServiceConfig;
1✔
1807
              }
1808
            } else {
1809
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1810
              realChannel.updateConfigSelector(null);
1✔
1811
            }
1812
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1813
              channelLogger.log(
1✔
1814
                  ChannelLogLevel.INFO,
1815
                  "Service config changed{0}",
1816
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1817
              lastServiceConfig = effectiveServiceConfig;
1✔
1818
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1819
            }
1820

1821
            try {
1822
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1823
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1824
              //  lbNeedAddress is not deterministic
1825
              serviceConfigUpdated = true;
1✔
1826
            } catch (RuntimeException re) {
×
1827
              logger.log(
×
1828
                  Level.WARNING,
1829
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1830
                  re);
1831
            }
1✔
1832
          }
1833

1834
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1835
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1836
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1837
            Attributes.Builder attrBuilder =
1✔
1838
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1839
            Map<String, ?> healthCheckingConfig =
1✔
1840
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1841
            if (healthCheckingConfig != null) {
1✔
1842
              attrBuilder
1✔
1843
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1844
                  .build();
1✔
1845
            }
1846
            Attributes attributes = attrBuilder.build();
1✔
1847

1848
            boolean lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses(
1✔
1849
                ResolvedAddresses.newBuilder()
1✔
1850
                    .setAddresses(servers)
1✔
1851
                    .setAttributes(attributes)
1✔
1852
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1853
                    .build());
1✔
1854
            // If a listener is provided, let it know if the addresses were accepted.
1855
            if (resolutionResultListener != null) {
1✔
1856
              resolutionResultListener.resolutionAttempted(lastAddressesAccepted);
1✔
1857
            }
1858
          }
1859
        }
1✔
1860
      }
1861

1862
      syncContext.execute(new NamesResolved());
1✔
1863
    }
1✔
1864

1865
    @Override
1866
    public void onError(final Status error) {
1867
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1868
      final class NameResolverErrorHandler implements Runnable {
1✔
1869
        @Override
1870
        public void run() {
1871
          handleErrorInSyncContext(error);
1✔
1872
        }
1✔
1873
      }
1874

1875
      syncContext.execute(new NameResolverErrorHandler());
1✔
1876
    }
1✔
1877

1878
    private void handleErrorInSyncContext(Status error) {
1879
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1880
          new Object[] {getLogId(), error});
1✔
1881
      realChannel.onConfigError();
1✔
1882
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1883
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1884
        lastResolutionState = ResolutionState.ERROR;
1✔
1885
      }
1886
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1887
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1888
        return;
1✔
1889
      }
1890

1891
      helper.lb.handleNameResolutionError(error);
1✔
1892
    }
1✔
1893
  }
1894

1895
  private final class SubchannelImpl extends AbstractSubchannel {
1896
    final CreateSubchannelArgs args;
1897
    final InternalLogId subchannelLogId;
1898
    final ChannelLoggerImpl subchannelLogger;
1899
    final ChannelTracer subchannelTracer;
1900
    List<EquivalentAddressGroup> addressGroups;
1901
    InternalSubchannel subchannel;
1902
    boolean started;
1903
    boolean shutdown;
1904
    ScheduledHandle delayedShutdownTask;
1905

1906
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1907
      checkNotNull(args, "args");
1✔
1908
      addressGroups = args.getAddresses();
1✔
1909
      if (authorityOverride != null) {
1✔
1910
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1911
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1912
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1913
      }
1914
      this.args = args;
1✔
1915
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1916
      subchannelTracer = new ChannelTracer(
1✔
1917
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1918
          "Subchannel for " + args.getAddresses());
1✔
1919
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1920
    }
1✔
1921

1922
    @Override
1923
    public void start(final SubchannelStateListener listener) {
1924
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1925
      checkState(!started, "already started");
1✔
1926
      checkState(!shutdown, "already shutdown");
1✔
1927
      checkState(!terminating, "Channel is being terminated");
1✔
1928
      started = true;
1✔
1929
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1930
        // All callbacks are run in syncContext
1931
        @Override
1932
        void onTerminated(InternalSubchannel is) {
1933
          subchannels.remove(is);
1✔
1934
          channelz.removeSubchannel(is);
1✔
1935
          maybeTerminateChannel();
1✔
1936
        }
1✔
1937

1938
        @Override
1939
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1940
          checkState(listener != null, "listener is null");
1✔
1941
          listener.onSubchannelState(newState);
1✔
1942
        }
1✔
1943

1944
        @Override
1945
        void onInUse(InternalSubchannel is) {
1946
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1947
        }
1✔
1948

1949
        @Override
1950
        void onNotInUse(InternalSubchannel is) {
1951
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1952
        }
1✔
1953
      }
1954

1955
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1956
          args.getAddresses(),
1✔
1957
          authority(),
1✔
1958
          userAgent,
1✔
1959
          backoffPolicyProvider,
1✔
1960
          transportFactory,
1✔
1961
          transportFactory.getScheduledExecutorService(),
1✔
1962
          stopwatchSupplier,
1✔
1963
          syncContext,
1964
          new ManagedInternalSubchannelCallback(),
1965
          channelz,
1✔
1966
          callTracerFactory.create(),
1✔
1967
          subchannelTracer,
1968
          subchannelLogId,
1969
          subchannelLogger);
1970

1971
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1972
          .setDescription("Child Subchannel started")
1✔
1973
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1974
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1975
          .setSubchannelRef(internalSubchannel)
1✔
1976
          .build());
1✔
1977

1978
      this.subchannel = internalSubchannel;
1✔
1979
      channelz.addSubchannel(internalSubchannel);
1✔
1980
      subchannels.add(internalSubchannel);
1✔
1981
    }
1✔
1982

1983
    @Override
1984
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1985
      checkState(started, "not started");
1✔
1986
      return subchannel;
1✔
1987
    }
1988

1989
    @Override
1990
    public void shutdown() {
1991
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1992
      if (subchannel == null) {
1✔
1993
        // start() was not successful
1994
        shutdown = true;
×
1995
        return;
×
1996
      }
1997
      if (shutdown) {
1✔
1998
        if (terminating && delayedShutdownTask != null) {
1✔
1999
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
2000
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
2001
          delayedShutdownTask.cancel();
×
2002
          delayedShutdownTask = null;
×
2003
          // Will fall through to the subchannel.shutdown() at the end.
2004
        } else {
2005
          return;
1✔
2006
        }
2007
      } else {
2008
        shutdown = true;
1✔
2009
      }
2010
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
2011
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
2012
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
2013
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
2014
      // shutdown of Subchannel for a few seconds here.
2015
      //
2016
      // TODO(zhangkun83): consider a better approach
2017
      // (https://github.com/grpc/grpc-java/issues/2562).
2018
      if (!terminating) {
1✔
2019
        final class ShutdownSubchannel implements Runnable {
1✔
2020
          @Override
2021
          public void run() {
2022
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
2023
          }
1✔
2024
        }
2025

2026
        delayedShutdownTask = syncContext.schedule(
1✔
2027
            new LogExceptionRunnable(new ShutdownSubchannel()),
2028
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2029
            transportFactory.getScheduledExecutorService());
1✔
2030
        return;
1✔
2031
      }
2032
      // When terminating == true, no more real streams will be created. It's safe and also
2033
      // desirable to shutdown timely.
2034
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
2035
    }
1✔
2036

2037
    @Override
2038
    public void requestConnection() {
2039
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2040
      checkState(started, "not started");
1✔
2041
      subchannel.obtainActiveTransport();
1✔
2042
    }
1✔
2043

2044
    @Override
2045
    public List<EquivalentAddressGroup> getAllAddresses() {
2046
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2047
      checkState(started, "not started");
1✔
2048
      return addressGroups;
1✔
2049
    }
2050

2051
    @Override
2052
    public Attributes getAttributes() {
2053
      return args.getAttributes();
1✔
2054
    }
2055

2056
    @Override
2057
    public String toString() {
2058
      return subchannelLogId.toString();
1✔
2059
    }
2060

2061
    @Override
2062
    public Channel asChannel() {
2063
      checkState(started, "not started");
1✔
2064
      return new SubchannelChannel(
1✔
2065
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2066
          transportFactory.getScheduledExecutorService(),
1✔
2067
          callTracerFactory.create(),
1✔
2068
          new AtomicReference<InternalConfigSelector>(null));
2069
    }
2070

2071
    @Override
2072
    public Object getInternalSubchannel() {
2073
      checkState(started, "Subchannel is not started");
1✔
2074
      return subchannel;
1✔
2075
    }
2076

2077
    @Override
2078
    public ChannelLogger getChannelLogger() {
2079
      return subchannelLogger;
1✔
2080
    }
2081

2082
    @Override
2083
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2084
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2085
      addressGroups = addrs;
1✔
2086
      if (authorityOverride != null) {
1✔
2087
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2088
      }
2089
      subchannel.updateAddresses(addrs);
1✔
2090
    }
1✔
2091

2092
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2093
        List<EquivalentAddressGroup> eags) {
2094
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2095
      for (EquivalentAddressGroup eag : eags) {
1✔
2096
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2097
            eag.getAddresses(),
1✔
2098
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2099
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2100
      }
1✔
2101
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2102
    }
2103
  }
2104

2105
  @Override
2106
  public String toString() {
2107
    return MoreObjects.toStringHelper(this)
1✔
2108
        .add("logId", logId.getId())
1✔
2109
        .add("target", target)
1✔
2110
        .toString();
1✔
2111
  }
2112

2113
  /**
2114
   * Called from syncContext.
2115
   */
2116
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2117
    @Override
2118
    public void transportShutdown(Status s) {
2119
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2120
    }
1✔
2121

2122
    @Override
2123
    public void transportReady() {
2124
      // Don't care
2125
    }
×
2126

2127
    @Override
2128
    public void transportInUse(final boolean inUse) {
2129
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2130
    }
1✔
2131

2132
    @Override
2133
    public void transportTerminated() {
2134
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2135
      terminating = true;
1✔
2136
      shutdownNameResolverAndLoadBalancer(false);
1✔
2137
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2138
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2139
      // here.
2140
      maybeShutdownNowSubchannels();
1✔
2141
      maybeTerminateChannel();
1✔
2142
    }
1✔
2143
  }
2144

2145
  /**
2146
   * Must be accessed from syncContext.
2147
   */
2148
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2149
    @Override
2150
    protected void handleInUse() {
2151
      exitIdleMode();
1✔
2152
    }
1✔
2153

2154
    @Override
2155
    protected void handleNotInUse() {
2156
      if (shutdown.get()) {
1✔
2157
        return;
1✔
2158
      }
2159
      rescheduleIdleTimer();
1✔
2160
    }
1✔
2161
  }
2162

2163
  /**
2164
   * Lazily request for Executor from an executor pool.
2165
   * Also act as an Executor directly to simply run a cmd
2166
   */
2167
  @VisibleForTesting
2168
  static final class ExecutorHolder implements Executor {
2169
    private final ObjectPool<? extends Executor> pool;
2170
    private Executor executor;
2171

2172
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2173
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2174
    }
1✔
2175

2176
    synchronized Executor getExecutor() {
2177
      if (executor == null) {
1✔
2178
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2179
      }
2180
      return executor;
1✔
2181
    }
2182

2183
    synchronized void release() {
2184
      if (executor != null) {
1✔
2185
        executor = pool.returnObject(executor);
1✔
2186
      }
2187
    }
1✔
2188

2189
    @Override
2190
    public void execute(Runnable command) {
2191
      getExecutor().execute(command);
1✔
2192
    }
1✔
2193
  }
2194

2195
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2196
    final ScheduledExecutorService delegate;
2197

2198
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2199
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2200
    }
1✔
2201

2202
    @Override
2203
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2204
      return delegate.schedule(callable, delay, unit);
×
2205
    }
2206

2207
    @Override
2208
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2209
      return delegate.schedule(cmd, delay, unit);
1✔
2210
    }
2211

2212
    @Override
2213
    public ScheduledFuture<?> scheduleAtFixedRate(
2214
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2215
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
×
2216
    }
2217

2218
    @Override
2219
    public ScheduledFuture<?> scheduleWithFixedDelay(
2220
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2221
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2222
    }
2223

2224
    @Override
2225
    public boolean awaitTermination(long timeout, TimeUnit unit)
2226
        throws InterruptedException {
2227
      return delegate.awaitTermination(timeout, unit);
×
2228
    }
2229

2230
    @Override
2231
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2232
        throws InterruptedException {
2233
      return delegate.invokeAll(tasks);
×
2234
    }
2235

2236
    @Override
2237
    public <T> List<Future<T>> invokeAll(
2238
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2239
        throws InterruptedException {
2240
      return delegate.invokeAll(tasks, timeout, unit);
×
2241
    }
2242

2243
    @Override
2244
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2245
        throws InterruptedException, ExecutionException {
2246
      return delegate.invokeAny(tasks);
×
2247
    }
2248

2249
    @Override
2250
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2251
        throws InterruptedException, ExecutionException, TimeoutException {
2252
      return delegate.invokeAny(tasks, timeout, unit);
×
2253
    }
2254

2255
    @Override
2256
    public boolean isShutdown() {
2257
      return delegate.isShutdown();
×
2258
    }
2259

2260
    @Override
2261
    public boolean isTerminated() {
2262
      return delegate.isTerminated();
×
2263
    }
2264

2265
    @Override
2266
    public void shutdown() {
2267
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2268
    }
2269

2270
    @Override
2271
    public List<Runnable> shutdownNow() {
2272
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2273
    }
2274

2275
    @Override
2276
    public <T> Future<T> submit(Callable<T> task) {
2277
      return delegate.submit(task);
×
2278
    }
2279

2280
    @Override
2281
    public Future<?> submit(Runnable task) {
2282
      return delegate.submit(task);
×
2283
    }
2284

2285
    @Override
2286
    public <T> Future<T> submit(Runnable task, T result) {
2287
      return delegate.submit(task, result);
×
2288
    }
2289

2290
    @Override
2291
    public void execute(Runnable command) {
2292
      delegate.execute(command);
×
2293
    }
×
2294
  }
2295

2296
  /**
2297
   * A ResolutionState indicates the status of last name resolution.
2298
   */
2299
  enum ResolutionState {
1✔
2300
    NO_RESOLUTION,
1✔
2301
    SUCCESS,
1✔
2302
    ERROR
1✔
2303
  }
2304
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc