• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18826

08 Sep 2023 10:09PM UTC coverage: 88.282% (-0.02%) from 88.299%
#18826

push

github-actions

web-flow
core: DEADLINE_EXCEEDED gives hints for slow resolver (#10545)

30345 of 34373 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.5
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.IDLE;
24
import static io.grpc.ConnectivityState.SHUTDOWN;
25
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
26
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.MoreObjects;
30
import com.google.common.base.Stopwatch;
31
import com.google.common.base.Supplier;
32
import com.google.common.util.concurrent.ListenableFuture;
33
import com.google.common.util.concurrent.SettableFuture;
34
import io.grpc.Attributes;
35
import io.grpc.CallCredentials;
36
import io.grpc.CallOptions;
37
import io.grpc.Channel;
38
import io.grpc.ChannelCredentials;
39
import io.grpc.ChannelLogger;
40
import io.grpc.ChannelLogger.ChannelLogLevel;
41
import io.grpc.ClientCall;
42
import io.grpc.ClientInterceptor;
43
import io.grpc.ClientInterceptors;
44
import io.grpc.ClientStreamTracer;
45
import io.grpc.CompressorRegistry;
46
import io.grpc.ConnectivityState;
47
import io.grpc.ConnectivityStateInfo;
48
import io.grpc.Context;
49
import io.grpc.Deadline;
50
import io.grpc.DecompressorRegistry;
51
import io.grpc.EquivalentAddressGroup;
52
import io.grpc.ForwardingChannelBuilder;
53
import io.grpc.ForwardingClientCall;
54
import io.grpc.Grpc;
55
import io.grpc.InternalChannelz;
56
import io.grpc.InternalChannelz.ChannelStats;
57
import io.grpc.InternalChannelz.ChannelTrace;
58
import io.grpc.InternalConfigSelector;
59
import io.grpc.InternalInstrumented;
60
import io.grpc.InternalLogId;
61
import io.grpc.InternalWithLogId;
62
import io.grpc.LoadBalancer;
63
import io.grpc.LoadBalancer.CreateSubchannelArgs;
64
import io.grpc.LoadBalancer.PickResult;
65
import io.grpc.LoadBalancer.PickSubchannelArgs;
66
import io.grpc.LoadBalancer.ResolvedAddresses;
67
import io.grpc.LoadBalancer.SubchannelPicker;
68
import io.grpc.LoadBalancer.SubchannelStateListener;
69
import io.grpc.ManagedChannel;
70
import io.grpc.ManagedChannelBuilder;
71
import io.grpc.Metadata;
72
import io.grpc.MethodDescriptor;
73
import io.grpc.NameResolver;
74
import io.grpc.NameResolver.ConfigOrError;
75
import io.grpc.NameResolver.ResolutionResult;
76
import io.grpc.NameResolverRegistry;
77
import io.grpc.ProxyDetector;
78
import io.grpc.Status;
79
import io.grpc.SynchronizationContext;
80
import io.grpc.SynchronizationContext.ScheduledHandle;
81
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
82
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
83
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
84
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
85
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
86
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
87
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
88
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
89
import io.grpc.internal.RetriableStream.Throttle;
90
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
91
import java.net.URI;
92
import java.net.URISyntaxException;
93
import java.util.ArrayList;
94
import java.util.Collection;
95
import java.util.Collections;
96
import java.util.HashSet;
97
import java.util.LinkedHashSet;
98
import java.util.List;
99
import java.util.Map;
100
import java.util.Set;
101
import java.util.concurrent.Callable;
102
import java.util.concurrent.CountDownLatch;
103
import java.util.concurrent.ExecutionException;
104
import java.util.concurrent.Executor;
105
import java.util.concurrent.Future;
106
import java.util.concurrent.ScheduledExecutorService;
107
import java.util.concurrent.ScheduledFuture;
108
import java.util.concurrent.TimeUnit;
109
import java.util.concurrent.TimeoutException;
110
import java.util.concurrent.atomic.AtomicBoolean;
111
import java.util.concurrent.atomic.AtomicReference;
112
import java.util.logging.Level;
113
import java.util.logging.Logger;
114
import java.util.regex.Pattern;
115
import javax.annotation.Nullable;
116
import javax.annotation.concurrent.GuardedBy;
117
import javax.annotation.concurrent.ThreadSafe;
118

119
/** A communication channel for making outgoing RPCs. */
120
@ThreadSafe
121
final class ManagedChannelImpl extends ManagedChannel implements
122
    InternalInstrumented<ChannelStats> {
123
  @VisibleForTesting
124
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
125

126
  // Matching this pattern means the target string is a URI target or at least intended to be one.
127
  // A URI target must be an absolute hierarchical URI.
128
  // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
129
  @VisibleForTesting
130
  static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
1✔
131

132
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
133

134
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
135

136
  @VisibleForTesting
137
  static final Status SHUTDOWN_NOW_STATUS =
1✔
138
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
139

140
  @VisibleForTesting
141
  static final Status SHUTDOWN_STATUS =
1✔
142
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
143

144
  @VisibleForTesting
145
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
146
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
147

148
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
149
      ManagedChannelServiceConfig.empty();
1✔
150
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
151
      new InternalConfigSelector() {
1✔
152
        @Override
153
        public Result selectConfig(PickSubchannelArgs args) {
154
          throw new IllegalStateException("Resolution is pending");
×
155
        }
156
      };
157

158
  private final InternalLogId logId;
159
  private final String target;
160
  @Nullable
161
  private final String authorityOverride;
162
  private final NameResolverRegistry nameResolverRegistry;
163
  private final NameResolver.Factory nameResolverFactory;
164
  private final NameResolver.Args nameResolverArgs;
165
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
166
  private final ClientTransportFactory originalTransportFactory;
167
  @Nullable
168
  private final ChannelCredentials originalChannelCreds;
169
  private final ClientTransportFactory transportFactory;
170
  private final ClientTransportFactory oobTransportFactory;
171
  private final RestrictedScheduledExecutor scheduledExecutor;
172
  private final Executor executor;
173
  private final ObjectPool<? extends Executor> executorPool;
174
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
175
  private final ExecutorHolder balancerRpcExecutorHolder;
176
  private final ExecutorHolder offloadExecutorHolder;
177
  private final TimeProvider timeProvider;
178
  private final int maxTraceEvents;
179

180
  @VisibleForTesting
1✔
181
  final SynchronizationContext syncContext = new SynchronizationContext(
182
      new Thread.UncaughtExceptionHandler() {
1✔
183
        @Override
184
        public void uncaughtException(Thread t, Throwable e) {
185
          logger.log(
1✔
186
              Level.SEVERE,
187
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
188
              e);
189
          panic(e);
1✔
190
        }
1✔
191
      });
192

193
  private boolean fullStreamDecompression;
194

195
  private final DecompressorRegistry decompressorRegistry;
196
  private final CompressorRegistry compressorRegistry;
197

198
  private final Supplier<Stopwatch> stopwatchSupplier;
199
  /** The timout before entering idle mode. */
200
  private final long idleTimeoutMillis;
201

202
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
203
  private final BackoffPolicy.Provider backoffPolicyProvider;
204

205
  /**
206
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
207
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
208
   * {@link RealChannel}.
209
   */
210
  private final Channel interceptorChannel;
211
  @Nullable private final String userAgent;
212

213
  // Only null after channel is terminated. Must be assigned from the syncContext.
214
  private NameResolver nameResolver;
215

216
  // Must be accessed from the syncContext.
217
  private boolean nameResolverStarted;
218

219
  // null when channel is in idle mode.  Must be assigned from syncContext.
220
  @Nullable
221
  private LbHelperImpl lbHelper;
222

223
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
224
  // null if channel is in idle mode.
225
  @Nullable
226
  private volatile SubchannelPicker subchannelPicker;
227

228
  // Must be accessed from the syncContext
229
  private boolean panicMode;
230

231
  // Must be mutated from syncContext
232
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
233
  // switch to a ConcurrentHashMap.
234
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
235

236
  // Must be accessed from syncContext
237
  @Nullable
238
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
239
  private final Object pendingCallsInUseObject = new Object();
1✔
240

241
  // Must be mutated from syncContext
242
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
243

244
  // reprocess() must be run from syncContext
245
  private final DelayedClientTransport delayedTransport;
246
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
247
      = new UncommittedRetriableStreamsRegistry();
248

249
  // Shutdown states.
250
  //
251
  // Channel's shutdown process:
252
  // 1. shutdown(): stop accepting new calls from applications
253
  //   1a shutdown <- true
254
  //   1b subchannelPicker <- null
255
  //   1c delayedTransport.shutdown()
256
  // 2. delayedTransport terminated: stop stream-creation functionality
257
  //   2a terminating <- true
258
  //   2b loadBalancer.shutdown()
259
  //     * LoadBalancer will shutdown subchannels and OOB channels
260
  //   2c loadBalancer <- null
261
  //   2d nameResolver.shutdown()
262
  //   2e nameResolver <- null
263
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
264

265
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
266
  // Must only be mutated and read from syncContext
267
  private boolean shutdownNowed;
268
  // Must only be mutated from syncContext
269
  private boolean terminating;
270
  // Must be mutated from syncContext
271
  private volatile boolean terminated;
272
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
273

274
  private final CallTracer.Factory callTracerFactory;
275
  private final CallTracer channelCallTracer;
276
  private final ChannelTracer channelTracer;
277
  private final ChannelLogger channelLogger;
278
  private final InternalChannelz channelz;
279
  private final RealChannel realChannel;
280
  // Must be mutated and read from syncContext
281
  // a flag for doing channel tracing when flipped
282
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
283
  // Must be mutated and read from constructor or syncContext
284
  // used for channel tracing when value changed
285
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
286

287
  @Nullable
288
  private final ManagedChannelServiceConfig defaultServiceConfig;
289
  // Must be mutated and read from constructor or syncContext
290
  private boolean serviceConfigUpdated = false;
1✔
291
  private final boolean lookUpServiceConfig;
292

293
  // One instance per channel.
294
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
295

296
  private final long perRpcBufferLimit;
297
  private final long channelBufferLimit;
298

299
  // Temporary false flag that can skip the retry code path.
300
  private final boolean retryEnabled;
301

302
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
303

304
  // Called from syncContext
305
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
306
      new DelayedTransportListener();
307

308
  // Must be called from syncContext
309
  private void maybeShutdownNowSubchannels() {
310
    if (shutdownNowed) {
1✔
311
      for (InternalSubchannel subchannel : subchannels) {
1✔
312
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
313
      }
1✔
314
      for (OobChannel oobChannel : oobChannels) {
1✔
315
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
316
      }
1✔
317
    }
318
  }
1✔
319

320
  // Must be accessed from syncContext
321
  @VisibleForTesting
1✔
322
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
323

324
  @Override
325
  public ListenableFuture<ChannelStats> getStats() {
326
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
327
    final class StatsFetcher implements Runnable {
1✔
328
      @Override
329
      public void run() {
330
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
331
        channelCallTracer.updateBuilder(builder);
1✔
332
        channelTracer.updateBuilder(builder);
1✔
333
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
334
        List<InternalWithLogId> children = new ArrayList<>();
1✔
335
        children.addAll(subchannels);
1✔
336
        children.addAll(oobChannels);
1✔
337
        builder.setSubchannels(children);
1✔
338
        ret.set(builder.build());
1✔
339
      }
1✔
340
    }
341

342
    // subchannels and oobchannels can only be accessed from syncContext
343
    syncContext.execute(new StatsFetcher());
1✔
344
    return ret;
1✔
345
  }
346

347
  @Override
348
  public InternalLogId getLogId() {
349
    return logId;
1✔
350
  }
351

352
  // Run from syncContext
353
  private class IdleModeTimer implements Runnable {
1✔
354

355
    @Override
356
    public void run() {
357
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
358
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
359
      // subtle to change rapidly to resolve the channel panic. See #8714
360
      if (lbHelper == null) {
1✔
361
        return;
×
362
      }
363
      enterIdleMode();
1✔
364
    }
1✔
365
  }
366

367
  // Must be called from syncContext
368
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
369
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
370
    if (channelIsActive) {
1✔
371
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
372
      checkState(lbHelper != null, "lbHelper is null");
1✔
373
    }
374
    if (nameResolver != null) {
1✔
375
      nameResolver.shutdown();
1✔
376
      nameResolverStarted = false;
1✔
377
      if (channelIsActive) {
1✔
378
        nameResolver = getNameResolver(
1✔
379
            target, authorityOverride, nameResolverFactory, nameResolverArgs);
380
      } else {
381
        nameResolver = null;
1✔
382
      }
383
    }
384
    if (lbHelper != null) {
1✔
385
      lbHelper.lb.shutdown();
1✔
386
      lbHelper = null;
1✔
387
    }
388
    subchannelPicker = null;
1✔
389
  }
1✔
390

391
  /**
392
   * Make the channel exit idle mode, if it's in it.
393
   *
394
   * <p>Must be called from syncContext
395
   */
396
  @VisibleForTesting
397
  void exitIdleMode() {
398
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
399
    if (shutdown.get() || panicMode) {
1✔
400
      return;
1✔
401
    }
402
    if (inUseStateAggregator.isInUse()) {
1✔
403
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
404
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
405
      cancelIdleTimer(false);
1✔
406
    } else {
407
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
408
      // isInUse() == false, in which case we still need to schedule the timer.
409
      rescheduleIdleTimer();
1✔
410
    }
411
    if (lbHelper != null) {
1✔
412
      return;
1✔
413
    }
414
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
415
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
416
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
417
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
418
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
419
    this.lbHelper = lbHelper;
1✔
420

421
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
422
    nameResolver.start(listener);
1✔
423
    nameResolverStarted = true;
1✔
424
  }
1✔
425

426
  // Must be run from syncContext
427
  private void enterIdleMode() {
428
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
429
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
430
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
431
    // which are bugs.
432
    shutdownNameResolverAndLoadBalancer(true);
1✔
433
    delayedTransport.reprocess(null);
1✔
434
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
435
    channelStateManager.gotoState(IDLE);
1✔
436
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
437
    // transport to be holding some we need to exit idle mode to give these calls a chance to
438
    // be processed.
439
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
440
      exitIdleMode();
1✔
441
    }
442
  }
1✔
443

444
  // Must be run from syncContext
445
  private void cancelIdleTimer(boolean permanent) {
446
    idleTimer.cancel(permanent);
1✔
447
  }
1✔
448

449
  // Always run from syncContext
450
  private void rescheduleIdleTimer() {
451
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
452
      return;
1✔
453
    }
454
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
455
  }
1✔
456

457
  /**
458
   * Force name resolution refresh to happen immediately. Must be run
459
   * from syncContext.
460
   */
461
  private void refreshNameResolution() {
462
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
463
    if (nameResolverStarted) {
1✔
464
      nameResolver.refresh();
1✔
465
    }
466
  }
1✔
467

468
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
469
    volatile Throttle throttle;
470

471
    private ClientTransport getTransport(PickSubchannelArgs args) {
472
      SubchannelPicker pickerCopy = subchannelPicker;
1✔
473
      if (shutdown.get()) {
1✔
474
        // If channel is shut down, delayedTransport is also shut down which will fail the stream
475
        // properly.
476
        return delayedTransport;
1✔
477
      }
478
      if (pickerCopy == null) {
1✔
479
        final class ExitIdleModeForTransport implements Runnable {
1✔
480
          @Override
481
          public void run() {
482
            exitIdleMode();
1✔
483
          }
1✔
484
        }
485

486
        syncContext.execute(new ExitIdleModeForTransport());
1✔
487
        return delayedTransport;
1✔
488
      }
489
      // There is no need to reschedule the idle timer here.
490
      //
491
      // pickerCopy != null, which means idle timer has not expired when this method starts.
492
      // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
493
      // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
494
      // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
495
      //
496
      // In most cases the idle timer is scheduled to fire after the transport has created the
497
      // stream, which would have reported in-use state to the channel that would have cancelled
498
      // the idle timer.
499
      PickResult pickResult = pickerCopy.pickSubchannel(args);
1✔
500
      ClientTransport transport = GrpcUtil.getTransportFromPickResult(
1✔
501
          pickResult, args.getCallOptions().isWaitForReady());
1✔
502
      if (transport != null) {
1✔
503
        return transport;
1✔
504
      }
505
      return delayedTransport;
1✔
506
    }
507

508
    @Override
509
    public ClientStream newStream(
510
        final MethodDescriptor<?, ?> method,
511
        final CallOptions callOptions,
512
        final Metadata headers,
513
        final Context context) {
514
      if (!retryEnabled) {
1✔
515
        ClientTransport transport =
1✔
516
            getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
1✔
517
        Context origContext = context.attach();
1✔
518
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
519
            callOptions, headers, 0, /* isTransparentRetry= */ false);
520
        try {
521
          return transport.newStream(method, headers, callOptions, tracers);
1✔
522
        } finally {
523
          context.detach(origContext);
1✔
524
        }
525
      } else {
526
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
527
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
528
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
529
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
530
          @SuppressWarnings("unchecked")
531
          RetryStream() {
1✔
532
            super(
1✔
533
                (MethodDescriptor<ReqT, ?>) method,
534
                headers,
535
                channelBufferUsed,
1✔
536
                perRpcBufferLimit,
1✔
537
                channelBufferLimit,
1✔
538
                getCallExecutor(callOptions),
1✔
539
                transportFactory.getScheduledExecutorService(),
1✔
540
                retryPolicy,
541
                hedgingPolicy,
542
                throttle);
543
          }
1✔
544

545
          @Override
546
          Status prestart() {
547
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
548
          }
549

550
          @Override
551
          void postCommit() {
552
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
553
          }
1✔
554

555
          @Override
556
          ClientStream newSubstream(
557
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
558
              boolean isTransparentRetry) {
559
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
560
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
561
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
562
            ClientTransport transport =
1✔
563
                getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
1✔
564
            Context origContext = context.attach();
1✔
565
            try {
566
              return transport.newStream(method, newHeaders, newOptions, tracers);
1✔
567
            } finally {
568
              context.detach(origContext);
1✔
569
            }
570
          }
571
        }
572

573
        return new RetryStream<>();
1✔
574
      }
575
    }
576
  }
577

578
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
579

580
  private final Rescheduler idleTimer;
581

582
  ManagedChannelImpl(
583
      ManagedChannelImplBuilder builder,
584
      ClientTransportFactory clientTransportFactory,
585
      BackoffPolicy.Provider backoffPolicyProvider,
586
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
587
      Supplier<Stopwatch> stopwatchSupplier,
588
      List<ClientInterceptor> interceptors,
589
      final TimeProvider timeProvider) {
1✔
590
    this.target = checkNotNull(builder.target, "target");
1✔
591
    this.logId = InternalLogId.allocate("Channel", target);
1✔
592
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
593
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
594
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
595
    this.originalChannelCreds = builder.channelCredentials;
1✔
596
    this.originalTransportFactory = clientTransportFactory;
1✔
597
    this.offloadExecutorHolder =
1✔
598
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
599
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
600
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
601
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
602
        clientTransportFactory, null, this.offloadExecutorHolder);
603
    this.scheduledExecutor =
1✔
604
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
605
    maxTraceEvents = builder.maxTraceEvents;
1✔
606
    channelTracer = new ChannelTracer(
1✔
607
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
608
        "Channel for '" + target + "'");
609
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
610
    ProxyDetector proxyDetector =
611
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
612
    this.retryEnabled = builder.retryEnabled;
1✔
613
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
614
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
615
    ScParser serviceConfigParser =
1✔
616
        new ScParser(
617
            retryEnabled,
618
            builder.maxRetryAttempts,
619
            builder.maxHedgedAttempts,
620
            loadBalancerFactory);
621
    this.authorityOverride = builder.authorityOverride;
1✔
622
    this.nameResolverArgs =
1✔
623
        NameResolver.Args.newBuilder()
1✔
624
            .setDefaultPort(builder.getDefaultPort())
1✔
625
            .setProxyDetector(proxyDetector)
1✔
626
            .setSynchronizationContext(syncContext)
1✔
627
            .setScheduledExecutorService(scheduledExecutor)
1✔
628
            .setServiceConfigParser(serviceConfigParser)
1✔
629
            .setChannelLogger(channelLogger)
1✔
630
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
631
            .setOverrideAuthority(this.authorityOverride)
1✔
632
            .build();
1✔
633
    this.nameResolverFactory = builder.nameResolverFactory;
1✔
634
    this.nameResolver = getNameResolver(
1✔
635
        target, authorityOverride, nameResolverFactory, nameResolverArgs);
636
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
637
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
638
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
639
    this.delayedTransport.start(delayedTransportListener);
1✔
640
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
641

642
    if (builder.defaultServiceConfig != null) {
1✔
643
      ConfigOrError parsedDefaultServiceConfig =
1✔
644
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
645
      checkState(
1✔
646
          parsedDefaultServiceConfig.getError() == null,
1✔
647
          "Default config is invalid: %s",
648
          parsedDefaultServiceConfig.getError());
1✔
649
      this.defaultServiceConfig =
1✔
650
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
651
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
652
    } else {
1✔
653
      this.defaultServiceConfig = null;
1✔
654
    }
655
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
656
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
657
    Channel channel = realChannel;
1✔
658
    if (builder.binlog != null) {
1✔
659
      channel = builder.binlog.wrapChannel(channel);
1✔
660
    }
661
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
662
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
663
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
664
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
665
    } else {
666
      checkArgument(
1✔
667
          builder.idleTimeoutMillis
668
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
669
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
670
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
671
    }
672

673
    idleTimer = new Rescheduler(
1✔
674
        new IdleModeTimer(),
675
        syncContext,
676
        transportFactory.getScheduledExecutorService(),
1✔
677
        stopwatchSupplier.get());
1✔
678
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
679
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
680
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
681
    this.userAgent = builder.userAgent;
1✔
682

683
    this.channelBufferLimit = builder.retryBufferSize;
1✔
684
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
685
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
686
      @Override
687
      public CallTracer create() {
688
        return new CallTracer(timeProvider);
1✔
689
      }
690
    }
691

692
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
693
    channelCallTracer = callTracerFactory.create();
1✔
694
    this.channelz = checkNotNull(builder.channelz);
1✔
695
    channelz.addRootChannel(this);
1✔
696

697
    if (!lookUpServiceConfig) {
1✔
698
      if (defaultServiceConfig != null) {
1✔
699
        channelLogger.log(
1✔
700
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
701
      }
702
      serviceConfigUpdated = true;
1✔
703
    }
704
  }
1✔
705

706
  private static NameResolver getNameResolver(
707
      String target, NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) {
708
    // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
709
    // "dns:///".
710
    URI targetUri = null;
1✔
711
    StringBuilder uriSyntaxErrors = new StringBuilder();
1✔
712
    try {
713
      targetUri = new URI(target);
1✔
714
      // For "localhost:8080" this would likely cause newNameResolver to return null, because
715
      // "localhost" is parsed as the scheme. Will fall into the next branch and try
716
      // "dns:///localhost:8080".
717
    } catch (URISyntaxException e) {
1✔
718
      // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
719
      uriSyntaxErrors.append(e.getMessage());
1✔
720
    }
1✔
721
    if (targetUri != null) {
1✔
722
      NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
1✔
723
      if (resolver != null) {
1✔
724
        return resolver;
1✔
725
      }
726
      // "foo.googleapis.com:8080" cause resolver to be null, because "foo.googleapis.com" is an
727
      // unmapped scheme. Just fall through and will try "dns:///foo.googleapis.com:8080"
728
    }
729

730
    // If we reached here, the targetUri couldn't be used.
731
    if (!URI_PATTERN.matcher(target).matches()) {
1✔
732
      // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
733
      // scheme from the factory.
734
      try {
735
        targetUri = new URI(nameResolverFactory.getDefaultScheme(), "", "/" + target, null);
1✔
736
      } catch (URISyntaxException e) {
×
737
        // Should not be possible.
738
        throw new IllegalArgumentException(e);
×
739
      }
1✔
740
      NameResolver resolver = nameResolverFactory.newNameResolver(targetUri, nameResolverArgs);
1✔
741
      if (resolver != null) {
1✔
742
        return resolver;
1✔
743
      }
744
    }
745
    throw new IllegalArgumentException(String.format(
1✔
746
        "cannot find a NameResolver for %s%s",
747
        target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
1✔
748
  }
749

750
  @VisibleForTesting
751
  static NameResolver getNameResolver(
752
      String target, @Nullable final String overrideAuthority,
753
      NameResolver.Factory nameResolverFactory, NameResolver.Args nameResolverArgs) {
754
    NameResolver resolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
1✔
755

756
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
757
    // TODO: After a transition period, all NameResolver implementations that need retry should use
758
    //       RetryingNameResolver directly and this step can be removed.
759
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
760
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
761
              nameResolverArgs.getScheduledExecutorService(),
1✔
762
              nameResolverArgs.getSynchronizationContext()),
1✔
763
          nameResolverArgs.getSynchronizationContext());
1✔
764

765
    if (overrideAuthority == null) {
1✔
766
      return usedNameResolver;
1✔
767
    }
768

769
    return new ForwardingNameResolver(usedNameResolver) {
1✔
770
      @Override
771
      public String getServiceAuthority() {
772
        return overrideAuthority;
1✔
773
      }
774
    };
775
  }
776

777
  @VisibleForTesting
778
  InternalConfigSelector getConfigSelector() {
779
    return realChannel.configSelector.get();
1✔
780
  }
781

782
  /**
783
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
784
   * cancelled.
785
   */
786
  @Override
787
  public ManagedChannelImpl shutdown() {
788
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
789
    if (!shutdown.compareAndSet(false, true)) {
1✔
790
      return this;
1✔
791
    }
792
    final class Shutdown implements Runnable {
1✔
793
      @Override
794
      public void run() {
795
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
796
        channelStateManager.gotoState(SHUTDOWN);
1✔
797
      }
1✔
798
    }
799

800
    syncContext.execute(new Shutdown());
1✔
801
    realChannel.shutdown();
1✔
802
    final class CancelIdleTimer implements Runnable {
1✔
803
      @Override
804
      public void run() {
805
        cancelIdleTimer(/* permanent= */ true);
1✔
806
      }
1✔
807
    }
808

809
    syncContext.execute(new CancelIdleTimer());
1✔
810
    return this;
1✔
811
  }
812

813
  /**
814
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
815
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
816
   * return {@code false} immediately after this method returns.
817
   */
818
  @Override
819
  public ManagedChannelImpl shutdownNow() {
820
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
821
    shutdown();
1✔
822
    realChannel.shutdownNow();
1✔
823
    final class ShutdownNow implements Runnable {
1✔
824
      @Override
825
      public void run() {
826
        if (shutdownNowed) {
1✔
827
          return;
1✔
828
        }
829
        shutdownNowed = true;
1✔
830
        maybeShutdownNowSubchannels();
1✔
831
      }
1✔
832
    }
833

834
    syncContext.execute(new ShutdownNow());
1✔
835
    return this;
1✔
836
  }
837

838
  // Called from syncContext
839
  @VisibleForTesting
840
  void panic(final Throwable t) {
841
    if (panicMode) {
1✔
842
      // Preserve the first panic information
843
      return;
×
844
    }
845
    panicMode = true;
1✔
846
    cancelIdleTimer(/* permanent= */ true);
1✔
847
    shutdownNameResolverAndLoadBalancer(false);
1✔
848
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
849
      private final PickResult panicPickResult =
1✔
850
          PickResult.withDrop(
1✔
851
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
852

853
      @Override
854
      public PickResult pickSubchannel(PickSubchannelArgs args) {
855
        return panicPickResult;
1✔
856
      }
857

858
      @Override
859
      public String toString() {
860
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
861
            .add("panicPickResult", panicPickResult)
×
862
            .toString();
×
863
      }
864
    }
865

866
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
867
    realChannel.updateConfigSelector(null);
1✔
868
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
869
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
870
  }
1✔
871

872
  @VisibleForTesting
873
  boolean isInPanicMode() {
874
    return panicMode;
1✔
875
  }
876

877
  // Called from syncContext
878
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
879
    subchannelPicker = newPicker;
1✔
880
    delayedTransport.reprocess(newPicker);
1✔
881
  }
1✔
882

883
  @Override
884
  public boolean isShutdown() {
885
    return shutdown.get();
1✔
886
  }
887

888
  @Override
889
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
890
    return terminatedLatch.await(timeout, unit);
1✔
891
  }
892

893
  @Override
894
  public boolean isTerminated() {
895
    return terminated;
1✔
896
  }
897

898
  /*
899
   * Creates a new outgoing call on the channel.
900
   */
901
  @Override
902
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
903
      CallOptions callOptions) {
904
    return interceptorChannel.newCall(method, callOptions);
1✔
905
  }
906

907
  @Override
908
  public String authority() {
909
    return interceptorChannel.authority();
1✔
910
  }
911

912
  private Executor getCallExecutor(CallOptions callOptions) {
913
    Executor executor = callOptions.getExecutor();
1✔
914
    if (executor == null) {
1✔
915
      executor = this.executor;
1✔
916
    }
917
    return executor;
1✔
918
  }
919

920
  private class RealChannel extends Channel {
921
    // Reference to null if no config selector is available from resolution result
922
    // Reference must be set() from syncContext
923
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
924
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
925
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
926
    // same target, the new instance must have the same value.
927
    private final String authority;
928

929
    private final Channel clientCallImplChannel = new Channel() {
1✔
930
      @Override
931
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
932
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
933
        return new ClientCallImpl<>(
1✔
934
            method,
935
            getCallExecutor(callOptions),
1✔
936
            callOptions,
937
            transportProvider,
1✔
938
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
939
            channelCallTracer,
1✔
940
            null)
941
            .setFullStreamDecompression(fullStreamDecompression)
1✔
942
            .setDecompressorRegistry(decompressorRegistry)
1✔
943
            .setCompressorRegistry(compressorRegistry);
1✔
944
      }
945

946
      @Override
947
      public String authority() {
948
        return authority;
×
949
      }
950
    };
951

952
    private RealChannel(String authority) {
1✔
953
      this.authority =  checkNotNull(authority, "authority");
1✔
954
    }
1✔
955

956
    @Override
957
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
958
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
959
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
960
        return newClientCall(method, callOptions);
1✔
961
      }
962
      syncContext.execute(new Runnable() {
1✔
963
        @Override
964
        public void run() {
965
          exitIdleMode();
1✔
966
        }
1✔
967
      });
968
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
969
        // This is an optimization for the case (typically with InProcessTransport) when name
970
        // resolution result is immediately available at this point. Otherwise, some users'
971
        // tests might observe slight behavior difference from earlier grpc versions.
972
        return newClientCall(method, callOptions);
1✔
973
      }
974
      if (shutdown.get()) {
1✔
975
        // Return a failing ClientCall.
976
        return new ClientCall<ReqT, RespT>() {
×
977
          @Override
978
          public void start(Listener<RespT> responseListener, Metadata headers) {
979
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
980
          }
×
981

982
          @Override public void request(int numMessages) {}
×
983

984
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
985

986
          @Override public void halfClose() {}
×
987

988
          @Override public void sendMessage(ReqT message) {}
×
989
        };
990
      }
991
      Context context = Context.current();
1✔
992
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
993
      syncContext.execute(new Runnable() {
1✔
994
        @Override
995
        public void run() {
996
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
997
            if (pendingCalls == null) {
1✔
998
              pendingCalls = new LinkedHashSet<>();
1✔
999
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
1000
            }
1001
            pendingCalls.add(pendingCall);
1✔
1002
          } else {
1003
            pendingCall.reprocess();
1✔
1004
          }
1005
        }
1✔
1006
      });
1007
      return pendingCall;
1✔
1008
    }
1009

1010
    // Must run in SynchronizationContext.
1011
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
1012
      InternalConfigSelector prevConfig = configSelector.get();
1✔
1013
      configSelector.set(config);
1✔
1014
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
1015
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1016
          pendingCall.reprocess();
1✔
1017
        }
1✔
1018
      }
1019
    }
1✔
1020

1021
    // Must run in SynchronizationContext.
1022
    void onConfigError() {
1023
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1024
        updateConfigSelector(null);
1✔
1025
      }
1026
    }
1✔
1027

1028
    void shutdown() {
1029
      final class RealChannelShutdown implements Runnable {
1✔
1030
        @Override
1031
        public void run() {
1032
          if (pendingCalls == null) {
1✔
1033
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1034
              configSelector.set(null);
1✔
1035
            }
1036
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1037
          }
1038
        }
1✔
1039
      }
1040

1041
      syncContext.execute(new RealChannelShutdown());
1✔
1042
    }
1✔
1043

1044
    void shutdownNow() {
1045
      final class RealChannelShutdownNow implements Runnable {
1✔
1046
        @Override
1047
        public void run() {
1048
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1049
            configSelector.set(null);
1✔
1050
          }
1051
          if (pendingCalls != null) {
1✔
1052
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1053
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
1054
            }
1✔
1055
          }
1056
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
1057
        }
1✔
1058
      }
1059

1060
      syncContext.execute(new RealChannelShutdownNow());
1✔
1061
    }
1✔
1062

1063
    @Override
1064
    public String authority() {
1065
      return authority;
1✔
1066
    }
1067

1068
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1069
      final Context context;
1070
      final MethodDescriptor<ReqT, RespT> method;
1071
      final CallOptions callOptions;
1072
      private final long callCreationTime;
1073

1074
      PendingCall(
1075
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1076
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1077
        this.context = context;
1✔
1078
        this.method = method;
1✔
1079
        this.callOptions = callOptions;
1✔
1080
        this.callCreationTime = ticker.nanoTime();
1✔
1081
      }
1✔
1082

1083
      /** Called when it's ready to create a real call and reprocess the pending call. */
1084
      void reprocess() {
1085
        ClientCall<ReqT, RespT> realCall;
1086
        Context previous = context.attach();
1✔
1087
        try {
1088
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1089
              ticker.nanoTime() - callCreationTime);
1✔
1090
          realCall = newClientCall(method, delayResolutionOption);
1✔
1091
        } finally {
1092
          context.detach(previous);
1✔
1093
        }
1094
        Runnable toRun = setCall(realCall);
1✔
1095
        if (toRun == null) {
1✔
1096
          syncContext.execute(new PendingCallRemoval());
1✔
1097
        } else {
1098
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1099
            @Override
1100
            public void run() {
1101
              toRun.run();
1✔
1102
              syncContext.execute(new PendingCallRemoval());
1✔
1103
            }
1✔
1104
          });
1105
        }
1106
      }
1✔
1107

1108
      @Override
1109
      protected void callCancelled() {
1110
        super.callCancelled();
1✔
1111
        syncContext.execute(new PendingCallRemoval());
1✔
1112
      }
1✔
1113

1114
      final class PendingCallRemoval implements Runnable {
1✔
1115
        @Override
1116
        public void run() {
1117
          if (pendingCalls != null) {
1✔
1118
            pendingCalls.remove(PendingCall.this);
1✔
1119
            if (pendingCalls.isEmpty()) {
1✔
1120
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1121
              pendingCalls = null;
1✔
1122
              if (shutdown.get()) {
1✔
1123
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1124
              }
1125
            }
1126
          }
1127
        }
1✔
1128
      }
1129
    }
1130

1131
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1132
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1133
      InternalConfigSelector selector = configSelector.get();
1✔
1134
      if (selector == null) {
1✔
1135
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1136
      }
1137
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1138
        MethodInfo methodInfo =
1✔
1139
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1140
        if (methodInfo != null) {
1✔
1141
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1142
        }
1143
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1144
      }
1145
      return new ConfigSelectingClientCall<>(
1✔
1146
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1147
    }
1148
  }
1149

1150
  /**
1151
   * A client call for a given channel that applies a given config selector when it starts.
1152
   */
1153
  static final class ConfigSelectingClientCall<ReqT, RespT>
1154
      extends ForwardingClientCall<ReqT, RespT> {
1155

1156
    private final InternalConfigSelector configSelector;
1157
    private final Channel channel;
1158
    private final Executor callExecutor;
1159
    private final MethodDescriptor<ReqT, RespT> method;
1160
    private final Context context;
1161
    private CallOptions callOptions;
1162

1163
    private ClientCall<ReqT, RespT> delegate;
1164

1165
    ConfigSelectingClientCall(
1166
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1167
        MethodDescriptor<ReqT, RespT> method,
1168
        CallOptions callOptions) {
1✔
1169
      this.configSelector = configSelector;
1✔
1170
      this.channel = channel;
1✔
1171
      this.method = method;
1✔
1172
      this.callExecutor =
1✔
1173
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1174
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1175
      this.context = Context.current();
1✔
1176
    }
1✔
1177

1178
    @Override
1179
    protected ClientCall<ReqT, RespT> delegate() {
1180
      return delegate;
1✔
1181
    }
1182

1183
    @SuppressWarnings("unchecked")
1184
    @Override
1185
    public void start(Listener<RespT> observer, Metadata headers) {
1186
      PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
1✔
1187
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1188
      Status status = result.getStatus();
1✔
1189
      if (!status.isOk()) {
1✔
1190
        executeCloseObserverInContext(observer,
1✔
1191
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1192
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1193
        return;
1✔
1194
      }
1195
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1196
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1197
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1198
      if (methodInfo != null) {
1✔
1199
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1200
      }
1201
      if (interceptor != null) {
1✔
1202
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1203
      } else {
1204
        delegate = channel.newCall(method, callOptions);
×
1205
      }
1206
      delegate.start(observer, headers);
1✔
1207
    }
1✔
1208

1209
    private void executeCloseObserverInContext(
1210
        final Listener<RespT> observer, final Status status) {
1211
      class CloseInContext extends ContextRunnable {
1212
        CloseInContext() {
1✔
1213
          super(context);
1✔
1214
        }
1✔
1215

1216
        @Override
1217
        public void runInContext() {
1218
          observer.onClose(status, new Metadata());
1✔
1219
        }
1✔
1220
      }
1221

1222
      callExecutor.execute(new CloseInContext());
1✔
1223
    }
1✔
1224

1225
    @Override
1226
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1227
      if (delegate != null) {
×
1228
        delegate.cancel(message, cause);
×
1229
      }
1230
    }
×
1231
  }
1232

1233
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1234
    @Override
1235
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1236

1237
    @Override
1238
    public void request(int numMessages) {}
1✔
1239

1240
    @Override
1241
    public void cancel(String message, Throwable cause) {}
×
1242

1243
    @Override
1244
    public void halfClose() {}
×
1245

1246
    @Override
1247
    public void sendMessage(Object message) {}
×
1248

1249
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1250
    @Override
1251
    public boolean isReady() {
1252
      return false;
×
1253
    }
1254
  };
1255

1256
  /**
1257
   * Terminate the channel if termination conditions are met.
1258
   */
1259
  // Must be run from syncContext
1260
  private void maybeTerminateChannel() {
1261
    if (terminated) {
1✔
1262
      return;
×
1263
    }
1264
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1265
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1266
      channelz.removeRootChannel(this);
1✔
1267
      executorPool.returnObject(executor);
1✔
1268
      balancerRpcExecutorHolder.release();
1✔
1269
      offloadExecutorHolder.release();
1✔
1270
      // Release the transport factory so that it can deallocate any resources.
1271
      transportFactory.close();
1✔
1272

1273
      terminated = true;
1✔
1274
      terminatedLatch.countDown();
1✔
1275
    }
1276
  }
1✔
1277

1278
  // Must be called from syncContext
1279
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1280
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1281
      refreshNameResolution();
1✔
1282
    }
1283
  }
1✔
1284

1285
  @Override
1286
  @SuppressWarnings("deprecation")
1287
  public ConnectivityState getState(boolean requestConnection) {
1288
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1289
    if (requestConnection && savedChannelState == IDLE) {
1✔
1290
      final class RequestConnection implements Runnable {
1✔
1291
        @Override
1292
        public void run() {
1293
          exitIdleMode();
1✔
1294
          if (subchannelPicker != null) {
1✔
1295
            subchannelPicker.requestConnection();
1✔
1296
          }
1297
          if (lbHelper != null) {
1✔
1298
            lbHelper.lb.requestConnection();
1✔
1299
          }
1300
        }
1✔
1301
      }
1302

1303
      syncContext.execute(new RequestConnection());
1✔
1304
    }
1305
    return savedChannelState;
1✔
1306
  }
1307

1308
  @Override
1309
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1310
    final class NotifyStateChanged implements Runnable {
1✔
1311
      @Override
1312
      public void run() {
1313
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1314
      }
1✔
1315
    }
1316

1317
    syncContext.execute(new NotifyStateChanged());
1✔
1318
  }
1✔
1319

1320
  @Override
1321
  public void resetConnectBackoff() {
1322
    final class ResetConnectBackoff implements Runnable {
1✔
1323
      @Override
1324
      public void run() {
1325
        if (shutdown.get()) {
1✔
1326
          return;
1✔
1327
        }
1328
        if (nameResolverStarted) {
1✔
1329
          refreshNameResolution();
1✔
1330
        }
1331
        for (InternalSubchannel subchannel : subchannels) {
1✔
1332
          subchannel.resetConnectBackoff();
1✔
1333
        }
1✔
1334
        for (OobChannel oobChannel : oobChannels) {
1✔
1335
          oobChannel.resetConnectBackoff();
×
1336
        }
×
1337
      }
1✔
1338
    }
1339

1340
    syncContext.execute(new ResetConnectBackoff());
1✔
1341
  }
1✔
1342

1343
  @Override
1344
  public void enterIdle() {
1345
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1346
      @Override
1347
      public void run() {
1348
        if (shutdown.get() || lbHelper == null) {
1✔
1349
          return;
1✔
1350
        }
1351
        cancelIdleTimer(/* permanent= */ false);
1✔
1352
        enterIdleMode();
1✔
1353
      }
1✔
1354
    }
1355

1356
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1357
  }
1✔
1358

1359
  /**
1360
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1361
   * backoff.
1362
   */
1363
  private final class UncommittedRetriableStreamsRegistry {
1✔
1364
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1365
    // it's worthwhile to look for a lock-free approach.
1366
    final Object lock = new Object();
1✔
1367

1368
    @GuardedBy("lock")
1✔
1369
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1370

1371
    @GuardedBy("lock")
1372
    Status shutdownStatus;
1373

1374
    void onShutdown(Status reason) {
1375
      boolean shouldShutdownDelayedTransport = false;
1✔
1376
      synchronized (lock) {
1✔
1377
        if (shutdownStatus != null) {
1✔
1378
          return;
1✔
1379
        }
1380
        shutdownStatus = reason;
1✔
1381
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1382
        // retriable streams, which may be in backoff and not using any transport, are already
1383
        // started RPCs.
1384
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1385
          shouldShutdownDelayedTransport = true;
1✔
1386
        }
1387
      }
1✔
1388

1389
      if (shouldShutdownDelayedTransport) {
1✔
1390
        delayedTransport.shutdown(reason);
1✔
1391
      }
1392
    }
1✔
1393

1394
    void onShutdownNow(Status reason) {
1395
      onShutdown(reason);
1✔
1396
      Collection<ClientStream> streams;
1397

1398
      synchronized (lock) {
1✔
1399
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1400
      }
1✔
1401

1402
      for (ClientStream stream : streams) {
1✔
1403
        stream.cancel(reason);
1✔
1404
      }
1✔
1405
      delayedTransport.shutdownNow(reason);
1✔
1406
    }
1✔
1407

1408
    /**
1409
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1410
     * shutdown Status.
1411
     */
1412
    @Nullable
1413
    Status add(RetriableStream<?> retriableStream) {
1414
      synchronized (lock) {
1✔
1415
        if (shutdownStatus != null) {
1✔
1416
          return shutdownStatus;
1✔
1417
        }
1418
        uncommittedRetriableStreams.add(retriableStream);
1✔
1419
        return null;
1✔
1420
      }
1421
    }
1422

1423
    void remove(RetriableStream<?> retriableStream) {
1424
      Status shutdownStatusCopy = null;
1✔
1425

1426
      synchronized (lock) {
1✔
1427
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1428
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1429
          shutdownStatusCopy = shutdownStatus;
1✔
1430
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1431
          // hashmap.
1432
          uncommittedRetriableStreams = new HashSet<>();
1✔
1433
        }
1434
      }
1✔
1435

1436
      if (shutdownStatusCopy != null) {
1✔
1437
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1438
      }
1439
    }
1✔
1440
  }
1441

1442
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1443
    AutoConfiguredLoadBalancer lb;
1444

1445
    @Override
1446
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1447
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1448
      // No new subchannel should be created after load balancer has been shutdown.
1449
      checkState(!terminating, "Channel is being terminated");
1✔
1450
      return new SubchannelImpl(args);
1✔
1451
    }
1452

1453
    @Override
1454
    public void updateBalancingState(
1455
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1456
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1457
      checkNotNull(newState, "newState");
1✔
1458
      checkNotNull(newPicker, "newPicker");
1✔
1459
      final class UpdateBalancingState implements Runnable {
1✔
1460
        @Override
1461
        public void run() {
1462
          if (LbHelperImpl.this != lbHelper) {
1✔
1463
            return;
1✔
1464
          }
1465
          updateSubchannelPicker(newPicker);
1✔
1466
          // It's not appropriate to report SHUTDOWN state from lb.
1467
          // Ignore the case of newState == SHUTDOWN for now.
1468
          if (newState != SHUTDOWN) {
1✔
1469
            channelLogger.log(
1✔
1470
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1471
            channelStateManager.gotoState(newState);
1✔
1472
          }
1473
        }
1✔
1474
      }
1475

1476
      syncContext.execute(new UpdateBalancingState());
1✔
1477
    }
1✔
1478

1479
    @Override
1480
    public void refreshNameResolution() {
1481
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1482
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1483
        @Override
1484
        public void run() {
1485
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1486
        }
1✔
1487
      }
1488

1489
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1490
    }
1✔
1491

1492
    @Override
1493
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1494
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1495
    }
1496

1497
    @Override
1498
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1499
        String authority) {
1500
      // TODO(ejona): can we be even stricter? Like terminating?
1501
      checkState(!terminated, "Channel is terminated");
1✔
1502
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1503
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1504
      InternalLogId subchannelLogId =
1✔
1505
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1506
      ChannelTracer oobChannelTracer =
1✔
1507
          new ChannelTracer(
1508
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1509
              "OobChannel for " + addressGroup);
1510
      final OobChannel oobChannel = new OobChannel(
1✔
1511
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1512
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1513
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1514
          .setDescription("Child OobChannel created")
1✔
1515
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1516
          .setTimestampNanos(oobChannelCreationTime)
1✔
1517
          .setChannelRef(oobChannel)
1✔
1518
          .build());
1✔
1519
      ChannelTracer subchannelTracer =
1✔
1520
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1521
              "Subchannel for " + addressGroup);
1522
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1523
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1524
        @Override
1525
        void onTerminated(InternalSubchannel is) {
1526
          oobChannels.remove(oobChannel);
1✔
1527
          channelz.removeSubchannel(is);
1✔
1528
          oobChannel.handleSubchannelTerminated();
1✔
1529
          maybeTerminateChannel();
1✔
1530
        }
1✔
1531

1532
        @Override
1533
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1534
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1535
          //  state and refresh name resolution if necessary.
1536
          handleInternalSubchannelState(newState);
1✔
1537
          oobChannel.handleSubchannelStateChange(newState);
1✔
1538
        }
1✔
1539
      }
1540

1541
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1542
          addressGroup,
1543
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1544
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1545
          // All callback methods are run from syncContext
1546
          new ManagedOobChannelCallback(),
1547
          channelz,
1✔
1548
          callTracerFactory.create(),
1✔
1549
          subchannelTracer,
1550
          subchannelLogId,
1551
          subchannelLogger);
1552
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1553
          .setDescription("Child Subchannel created")
1✔
1554
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1555
          .setTimestampNanos(oobChannelCreationTime)
1✔
1556
          .setSubchannelRef(internalSubchannel)
1✔
1557
          .build());
1✔
1558
      channelz.addSubchannel(oobChannel);
1✔
1559
      channelz.addSubchannel(internalSubchannel);
1✔
1560
      oobChannel.setSubchannel(internalSubchannel);
1✔
1561
      final class AddOobChannel implements Runnable {
1✔
1562
        @Override
1563
        public void run() {
1564
          if (terminating) {
1✔
1565
            oobChannel.shutdown();
×
1566
          }
1567
          if (!terminated) {
1✔
1568
            // If channel has not terminated, it will track the subchannel and block termination
1569
            // for it.
1570
            oobChannels.add(oobChannel);
1✔
1571
          }
1572
        }
1✔
1573
      }
1574

1575
      syncContext.execute(new AddOobChannel());
1✔
1576
      return oobChannel;
1✔
1577
    }
1578

1579
    @Deprecated
1580
    @Override
1581
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1582
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1583
          // Override authority to keep the old behavior.
1584
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1585
          .overrideAuthority(getAuthority());
1✔
1586
    }
1587

1588
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1589
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1590
    @Override
1591
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1592
        final String target, final ChannelCredentials channelCreds) {
1593
      checkNotNull(channelCreds, "channelCreds");
1✔
1594

1595
      final class ResolvingOobChannelBuilder
1596
          extends ForwardingChannelBuilder<ResolvingOobChannelBuilder> {
1597
        final ManagedChannelBuilder<?> delegate;
1598

1599
        ResolvingOobChannelBuilder() {
1✔
1600
          final ClientTransportFactory transportFactory;
1601
          CallCredentials callCredentials;
1602
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1603
            transportFactory = originalTransportFactory;
1✔
1604
            callCredentials = null;
1✔
1605
          } else {
1606
            SwapChannelCredentialsResult swapResult =
1✔
1607
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1608
            if (swapResult == null) {
1✔
1609
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1610
              return;
×
1611
            } else {
1612
              transportFactory = swapResult.transportFactory;
1✔
1613
              callCredentials = swapResult.callCredentials;
1✔
1614
            }
1615
          }
1616
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1617
              new ClientTransportFactoryBuilder() {
1✔
1618
                @Override
1619
                public ClientTransportFactory buildClientTransportFactory() {
1620
                  return transportFactory;
1✔
1621
                }
1622
              };
1623
          delegate = new ManagedChannelImplBuilder(
1✔
1624
              target,
1625
              channelCreds,
1626
              callCredentials,
1627
              transportFactoryBuilder,
1628
              new FixedPortProvider(nameResolverArgs.getDefaultPort()));
1✔
1629
        }
1✔
1630

1631
        @Override
1632
        protected ManagedChannelBuilder<?> delegate() {
1633
          return delegate;
1✔
1634
        }
1635
      }
1636

1637
      checkState(!terminated, "Channel is terminated");
1✔
1638

1639
      @SuppressWarnings("deprecation")
1640
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder()
1✔
1641
          .nameResolverFactory(nameResolverFactory);
1✔
1642

1643
      return builder
1✔
1644
          // TODO(zdapeng): executors should not outlive the parent channel.
1645
          .executor(executor)
1✔
1646
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1647
          .maxTraceEvents(maxTraceEvents)
1✔
1648
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1649
          .userAgent(userAgent);
1✔
1650
    }
1651

1652
    @Override
1653
    public ChannelCredentials getUnsafeChannelCredentials() {
1654
      if (originalChannelCreds == null) {
×
1655
        return new DefaultChannelCreds();
×
1656
      }
1657
      return originalChannelCreds;
×
1658
    }
1659

1660
    @Override
1661
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1662
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1663
    }
×
1664

1665
    @Override
1666
    public void updateOobChannelAddresses(ManagedChannel channel,
1667
        List<EquivalentAddressGroup> eag) {
1668
      checkArgument(channel instanceof OobChannel,
1✔
1669
          "channel must have been returned from createOobChannel");
1670
      ((OobChannel) channel).updateAddresses(eag);
1✔
1671
    }
1✔
1672

1673
    @Override
1674
    public String getAuthority() {
1675
      return ManagedChannelImpl.this.authority();
1✔
1676
    }
1677

1678
    @Override
1679
    public SynchronizationContext getSynchronizationContext() {
1680
      return syncContext;
1✔
1681
    }
1682

1683
    @Override
1684
    public ScheduledExecutorService getScheduledExecutorService() {
1685
      return scheduledExecutor;
1✔
1686
    }
1687

1688
    @Override
1689
    public ChannelLogger getChannelLogger() {
1690
      return channelLogger;
1✔
1691
    }
1692

1693
    @Override
1694
    public NameResolver.Args getNameResolverArgs() {
1695
      return nameResolverArgs;
1✔
1696
    }
1697

1698
    @Override
1699
    public NameResolverRegistry getNameResolverRegistry() {
1700
      return nameResolverRegistry;
1✔
1701
    }
1702

1703
    /**
1704
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1705
     */
1706
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1707
    //     channel creds.
1708
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1709
      @Override
1710
      public ChannelCredentials withoutBearerTokens() {
1711
        return this;
×
1712
      }
1713
    }
1714
  }
1715

1716
  final class NameResolverListener extends NameResolver.Listener2 {
1717
    final LbHelperImpl helper;
1718
    final NameResolver resolver;
1719

1720
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1721
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1722
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1723
    }
1✔
1724

1725
    @Override
1726
    public void onResult(final ResolutionResult resolutionResult) {
1727
      final class NamesResolved implements Runnable {
1✔
1728

1729
        @SuppressWarnings("ReferenceEquality")
1730
        @Override
1731
        public void run() {
1732
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1733
            return;
1✔
1734
          }
1735

1736
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1737
          channelLogger.log(
1✔
1738
              ChannelLogLevel.DEBUG,
1739
              "Resolved address: {0}, config={1}",
1740
              servers,
1741
              resolutionResult.getAttributes());
1✔
1742

1743
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1744
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1745
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1746
          }
1747

1748
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1749
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1750
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1751
          InternalConfigSelector resolvedConfigSelector =
1✔
1752
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1753
          ManagedChannelServiceConfig validServiceConfig =
1754
              configOrError != null && configOrError.getConfig() != null
1✔
1755
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1756
                  : null;
1✔
1757
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1758

1759
          ManagedChannelServiceConfig effectiveServiceConfig;
1760
          if (!lookUpServiceConfig) {
1✔
1761
            if (validServiceConfig != null) {
1✔
1762
              channelLogger.log(
1✔
1763
                  ChannelLogLevel.INFO,
1764
                  "Service config from name resolver discarded by channel settings");
1765
            }
1766
            effectiveServiceConfig =
1767
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1768
            if (resolvedConfigSelector != null) {
1✔
1769
              channelLogger.log(
1✔
1770
                  ChannelLogLevel.INFO,
1771
                  "Config selector from name resolver discarded by channel settings");
1772
            }
1773
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1774
          } else {
1775
            // Try to use config if returned from name resolver
1776
            // Otherwise, try to use the default config if available
1777
            if (validServiceConfig != null) {
1✔
1778
              effectiveServiceConfig = validServiceConfig;
1✔
1779
              if (resolvedConfigSelector != null) {
1✔
1780
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1781
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1782
                  channelLogger.log(
×
1783
                      ChannelLogLevel.DEBUG,
1784
                      "Method configs in service config will be discarded due to presence of"
1785
                          + "config-selector");
1786
                }
1787
              } else {
1788
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1789
              }
1790
            } else if (defaultServiceConfig != null) {
1✔
1791
              effectiveServiceConfig = defaultServiceConfig;
1✔
1792
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1793
              channelLogger.log(
1✔
1794
                  ChannelLogLevel.INFO,
1795
                  "Received no service config, using default service config");
1796
            } else if (serviceConfigError != null) {
1✔
1797
              if (!serviceConfigUpdated) {
1✔
1798
                // First DNS lookup has invalid service config, and cannot fall back to default
1799
                channelLogger.log(
1✔
1800
                    ChannelLogLevel.INFO,
1801
                    "Fallback to error due to invalid first service config without default config");
1802
                // This error could be an "inappropriate" control plane error that should not bleed
1803
                // through to client code using gRPC. We let them flow through here to the LB as
1804
                // we later check for these error codes when investigating pick results in
1805
                // GrpcUtil.getTransportFromPickResult().
1806
                onError(configOrError.getError());
1✔
1807
                if (resolutionResultListener != null) {
1✔
1808
                  resolutionResultListener.resolutionAttempted(false);
1✔
1809
                }
1810
                return;
1✔
1811
              } else {
1812
                effectiveServiceConfig = lastServiceConfig;
1✔
1813
              }
1814
            } else {
1815
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1816
              realChannel.updateConfigSelector(null);
1✔
1817
            }
1818
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1819
              channelLogger.log(
1✔
1820
                  ChannelLogLevel.INFO,
1821
                  "Service config changed{0}",
1822
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1823
              lastServiceConfig = effectiveServiceConfig;
1✔
1824
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1825
            }
1826

1827
            try {
1828
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1829
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1830
              //  lbNeedAddress is not deterministic
1831
              serviceConfigUpdated = true;
1✔
1832
            } catch (RuntimeException re) {
×
1833
              logger.log(
×
1834
                  Level.WARNING,
1835
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1836
                  re);
1837
            }
1✔
1838
          }
1839

1840
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1841
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1842
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1843
            Attributes.Builder attrBuilder =
1✔
1844
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1845
            Map<String, ?> healthCheckingConfig =
1✔
1846
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1847
            if (healthCheckingConfig != null) {
1✔
1848
              attrBuilder
1✔
1849
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1850
                  .build();
1✔
1851
            }
1852
            Attributes attributes = attrBuilder.build();
1✔
1853

1854
            boolean lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses(
1✔
1855
                ResolvedAddresses.newBuilder()
1✔
1856
                    .setAddresses(servers)
1✔
1857
                    .setAttributes(attributes)
1✔
1858
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1859
                    .build());
1✔
1860
            // If a listener is provided, let it know if the addresses were accepted.
1861
            if (resolutionResultListener != null) {
1✔
1862
              resolutionResultListener.resolutionAttempted(lastAddressesAccepted);
1✔
1863
            }
1864
          }
1865
        }
1✔
1866
      }
1867

1868
      syncContext.execute(new NamesResolved());
1✔
1869
    }
1✔
1870

1871
    @Override
1872
    public void onError(final Status error) {
1873
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1874
      final class NameResolverErrorHandler implements Runnable {
1✔
1875
        @Override
1876
        public void run() {
1877
          handleErrorInSyncContext(error);
1✔
1878
        }
1✔
1879
      }
1880

1881
      syncContext.execute(new NameResolverErrorHandler());
1✔
1882
    }
1✔
1883

1884
    private void handleErrorInSyncContext(Status error) {
1885
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1886
          new Object[] {getLogId(), error});
1✔
1887
      realChannel.onConfigError();
1✔
1888
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1889
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1890
        lastResolutionState = ResolutionState.ERROR;
1✔
1891
      }
1892
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1893
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1894
        return;
1✔
1895
      }
1896

1897
      helper.lb.handleNameResolutionError(error);
1✔
1898
    }
1✔
1899
  }
1900

1901
  private final class SubchannelImpl extends AbstractSubchannel {
1902
    final CreateSubchannelArgs args;
1903
    final InternalLogId subchannelLogId;
1904
    final ChannelLoggerImpl subchannelLogger;
1905
    final ChannelTracer subchannelTracer;
1906
    List<EquivalentAddressGroup> addressGroups;
1907
    InternalSubchannel subchannel;
1908
    boolean started;
1909
    boolean shutdown;
1910
    ScheduledHandle delayedShutdownTask;
1911

1912
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1913
      checkNotNull(args, "args");
1✔
1914
      addressGroups = args.getAddresses();
1✔
1915
      if (authorityOverride != null) {
1✔
1916
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1917
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1918
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1919
      }
1920
      this.args = args;
1✔
1921
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1922
      subchannelTracer = new ChannelTracer(
1✔
1923
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1924
          "Subchannel for " + args.getAddresses());
1✔
1925
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1926
    }
1✔
1927

1928
    @Override
1929
    public void start(final SubchannelStateListener listener) {
1930
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1931
      checkState(!started, "already started");
1✔
1932
      checkState(!shutdown, "already shutdown");
1✔
1933
      checkState(!terminating, "Channel is being terminated");
1✔
1934
      started = true;
1✔
1935
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1936
        // All callbacks are run in syncContext
1937
        @Override
1938
        void onTerminated(InternalSubchannel is) {
1939
          subchannels.remove(is);
1✔
1940
          channelz.removeSubchannel(is);
1✔
1941
          maybeTerminateChannel();
1✔
1942
        }
1✔
1943

1944
        @Override
1945
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1946
          checkState(listener != null, "listener is null");
1✔
1947
          listener.onSubchannelState(newState);
1✔
1948
        }
1✔
1949

1950
        @Override
1951
        void onInUse(InternalSubchannel is) {
1952
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1953
        }
1✔
1954

1955
        @Override
1956
        void onNotInUse(InternalSubchannel is) {
1957
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1958
        }
1✔
1959
      }
1960

1961
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1962
          args.getAddresses(),
1✔
1963
          authority(),
1✔
1964
          userAgent,
1✔
1965
          backoffPolicyProvider,
1✔
1966
          transportFactory,
1✔
1967
          transportFactory.getScheduledExecutorService(),
1✔
1968
          stopwatchSupplier,
1✔
1969
          syncContext,
1970
          new ManagedInternalSubchannelCallback(),
1971
          channelz,
1✔
1972
          callTracerFactory.create(),
1✔
1973
          subchannelTracer,
1974
          subchannelLogId,
1975
          subchannelLogger);
1976

1977
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1978
          .setDescription("Child Subchannel started")
1✔
1979
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1980
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1981
          .setSubchannelRef(internalSubchannel)
1✔
1982
          .build());
1✔
1983

1984
      this.subchannel = internalSubchannel;
1✔
1985
      channelz.addSubchannel(internalSubchannel);
1✔
1986
      subchannels.add(internalSubchannel);
1✔
1987
    }
1✔
1988

1989
    @Override
1990
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1991
      checkState(started, "not started");
1✔
1992
      return subchannel;
1✔
1993
    }
1994

1995
    @Override
1996
    public void shutdown() {
1997
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1998
      if (subchannel == null) {
1✔
1999
        // start() was not successful
2000
        shutdown = true;
×
2001
        return;
×
2002
      }
2003
      if (shutdown) {
1✔
2004
        if (terminating && delayedShutdownTask != null) {
1✔
2005
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
2006
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
2007
          delayedShutdownTask.cancel();
×
2008
          delayedShutdownTask = null;
×
2009
          // Will fall through to the subchannel.shutdown() at the end.
2010
        } else {
2011
          return;
1✔
2012
        }
2013
      } else {
2014
        shutdown = true;
1✔
2015
      }
2016
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
2017
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
2018
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
2019
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
2020
      // shutdown of Subchannel for a few seconds here.
2021
      //
2022
      // TODO(zhangkun83): consider a better approach
2023
      // (https://github.com/grpc/grpc-java/issues/2562).
2024
      if (!terminating) {
1✔
2025
        final class ShutdownSubchannel implements Runnable {
1✔
2026
          @Override
2027
          public void run() {
2028
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
2029
          }
1✔
2030
        }
2031

2032
        delayedShutdownTask = syncContext.schedule(
1✔
2033
            new LogExceptionRunnable(new ShutdownSubchannel()),
2034
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2035
            transportFactory.getScheduledExecutorService());
1✔
2036
        return;
1✔
2037
      }
2038
      // When terminating == true, no more real streams will be created. It's safe and also
2039
      // desirable to shutdown timely.
2040
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
2041
    }
1✔
2042

2043
    @Override
2044
    public void requestConnection() {
2045
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2046
      checkState(started, "not started");
1✔
2047
      subchannel.obtainActiveTransport();
1✔
2048
    }
1✔
2049

2050
    @Override
2051
    public List<EquivalentAddressGroup> getAllAddresses() {
2052
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2053
      checkState(started, "not started");
1✔
2054
      return addressGroups;
1✔
2055
    }
2056

2057
    @Override
2058
    public Attributes getAttributes() {
2059
      return args.getAttributes();
1✔
2060
    }
2061

2062
    @Override
2063
    public String toString() {
2064
      return subchannelLogId.toString();
1✔
2065
    }
2066

2067
    @Override
2068
    public Channel asChannel() {
2069
      checkState(started, "not started");
1✔
2070
      return new SubchannelChannel(
1✔
2071
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2072
          transportFactory.getScheduledExecutorService(),
1✔
2073
          callTracerFactory.create(),
1✔
2074
          new AtomicReference<InternalConfigSelector>(null));
2075
    }
2076

2077
    @Override
2078
    public Object getInternalSubchannel() {
2079
      checkState(started, "Subchannel is not started");
1✔
2080
      return subchannel;
1✔
2081
    }
2082

2083
    @Override
2084
    public ChannelLogger getChannelLogger() {
2085
      return subchannelLogger;
1✔
2086
    }
2087

2088
    @Override
2089
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2090
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2091
      addressGroups = addrs;
1✔
2092
      if (authorityOverride != null) {
1✔
2093
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2094
      }
2095
      subchannel.updateAddresses(addrs);
1✔
2096
    }
1✔
2097

2098
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2099
        List<EquivalentAddressGroup> eags) {
2100
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2101
      for (EquivalentAddressGroup eag : eags) {
1✔
2102
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2103
            eag.getAddresses(),
1✔
2104
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2105
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2106
      }
1✔
2107
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2108
    }
2109
  }
2110

2111
  @Override
2112
  public String toString() {
2113
    return MoreObjects.toStringHelper(this)
1✔
2114
        .add("logId", logId.getId())
1✔
2115
        .add("target", target)
1✔
2116
        .toString();
1✔
2117
  }
2118

2119
  /**
2120
   * Called from syncContext.
2121
   */
2122
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2123
    @Override
2124
    public void transportShutdown(Status s) {
2125
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2126
    }
1✔
2127

2128
    @Override
2129
    public void transportReady() {
2130
      // Don't care
2131
    }
×
2132

2133
    @Override
2134
    public void transportInUse(final boolean inUse) {
2135
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2136
    }
1✔
2137

2138
    @Override
2139
    public void transportTerminated() {
2140
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2141
      terminating = true;
1✔
2142
      shutdownNameResolverAndLoadBalancer(false);
1✔
2143
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2144
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2145
      // here.
2146
      maybeShutdownNowSubchannels();
1✔
2147
      maybeTerminateChannel();
1✔
2148
    }
1✔
2149
  }
2150

2151
  /**
2152
   * Must be accessed from syncContext.
2153
   */
2154
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2155
    @Override
2156
    protected void handleInUse() {
2157
      exitIdleMode();
1✔
2158
    }
1✔
2159

2160
    @Override
2161
    protected void handleNotInUse() {
2162
      if (shutdown.get()) {
1✔
2163
        return;
1✔
2164
      }
2165
      rescheduleIdleTimer();
1✔
2166
    }
1✔
2167
  }
2168

2169
  /**
2170
   * Lazily request for Executor from an executor pool.
2171
   * Also act as an Executor directly to simply run a cmd
2172
   */
2173
  @VisibleForTesting
2174
  static final class ExecutorHolder implements Executor {
2175
    private final ObjectPool<? extends Executor> pool;
2176
    private Executor executor;
2177

2178
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2179
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2180
    }
1✔
2181

2182
    synchronized Executor getExecutor() {
2183
      if (executor == null) {
1✔
2184
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2185
      }
2186
      return executor;
1✔
2187
    }
2188

2189
    synchronized void release() {
2190
      if (executor != null) {
1✔
2191
        executor = pool.returnObject(executor);
1✔
2192
      }
2193
    }
1✔
2194

2195
    @Override
2196
    public void execute(Runnable command) {
2197
      getExecutor().execute(command);
1✔
2198
    }
1✔
2199
  }
2200

2201
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2202
    final ScheduledExecutorService delegate;
2203

2204
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2205
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2206
    }
1✔
2207

2208
    @Override
2209
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2210
      return delegate.schedule(callable, delay, unit);
×
2211
    }
2212

2213
    @Override
2214
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2215
      return delegate.schedule(cmd, delay, unit);
1✔
2216
    }
2217

2218
    @Override
2219
    public ScheduledFuture<?> scheduleAtFixedRate(
2220
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2221
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
×
2222
    }
2223

2224
    @Override
2225
    public ScheduledFuture<?> scheduleWithFixedDelay(
2226
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2227
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2228
    }
2229

2230
    @Override
2231
    public boolean awaitTermination(long timeout, TimeUnit unit)
2232
        throws InterruptedException {
2233
      return delegate.awaitTermination(timeout, unit);
×
2234
    }
2235

2236
    @Override
2237
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2238
        throws InterruptedException {
2239
      return delegate.invokeAll(tasks);
×
2240
    }
2241

2242
    @Override
2243
    public <T> List<Future<T>> invokeAll(
2244
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2245
        throws InterruptedException {
2246
      return delegate.invokeAll(tasks, timeout, unit);
×
2247
    }
2248

2249
    @Override
2250
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2251
        throws InterruptedException, ExecutionException {
2252
      return delegate.invokeAny(tasks);
×
2253
    }
2254

2255
    @Override
2256
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2257
        throws InterruptedException, ExecutionException, TimeoutException {
2258
      return delegate.invokeAny(tasks, timeout, unit);
×
2259
    }
2260

2261
    @Override
2262
    public boolean isShutdown() {
2263
      return delegate.isShutdown();
×
2264
    }
2265

2266
    @Override
2267
    public boolean isTerminated() {
2268
      return delegate.isTerminated();
×
2269
    }
2270

2271
    @Override
2272
    public void shutdown() {
2273
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2274
    }
2275

2276
    @Override
2277
    public List<Runnable> shutdownNow() {
2278
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2279
    }
2280

2281
    @Override
2282
    public <T> Future<T> submit(Callable<T> task) {
2283
      return delegate.submit(task);
×
2284
    }
2285

2286
    @Override
2287
    public Future<?> submit(Runnable task) {
2288
      return delegate.submit(task);
×
2289
    }
2290

2291
    @Override
2292
    public <T> Future<T> submit(Runnable task, T result) {
2293
      return delegate.submit(task, result);
×
2294
    }
2295

2296
    @Override
2297
    public void execute(Runnable command) {
2298
      delegate.execute(command);
×
2299
    }
×
2300
  }
2301

2302
  /**
2303
   * A ResolutionState indicates the status of last name resolution.
2304
   */
2305
  enum ResolutionState {
1✔
2306
    NO_RESOLUTION,
1✔
2307
    SUCCESS,
1✔
2308
    ERROR
1✔
2309
  }
2310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc