• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19216

09 May 2024 02:28AM UTC coverage: 88.369% (-0.007%) from 88.376%
#19216

push

github

ejona86
opentelemetry: Add grpc.target label to per-call metrics

As defined by gRFC A66, the target is on all client-side per-call
metrics (both call and attempt).

31538 of 35689 relevant lines covered (88.37%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.45
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import io.grpc.Attributes;
36
import io.grpc.CallCredentials;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ChannelCredentials;
40
import io.grpc.ChannelLogger;
41
import io.grpc.ChannelLogger.ChannelLogLevel;
42
import io.grpc.ClientCall;
43
import io.grpc.ClientInterceptor;
44
import io.grpc.ClientInterceptors;
45
import io.grpc.ClientStreamTracer;
46
import io.grpc.ClientTransportFilter;
47
import io.grpc.CompressorRegistry;
48
import io.grpc.ConnectivityState;
49
import io.grpc.ConnectivityStateInfo;
50
import io.grpc.Context;
51
import io.grpc.Deadline;
52
import io.grpc.DecompressorRegistry;
53
import io.grpc.EquivalentAddressGroup;
54
import io.grpc.ForwardingChannelBuilder2;
55
import io.grpc.ForwardingClientCall;
56
import io.grpc.Grpc;
57
import io.grpc.InternalChannelz;
58
import io.grpc.InternalChannelz.ChannelStats;
59
import io.grpc.InternalChannelz.ChannelTrace;
60
import io.grpc.InternalConfigSelector;
61
import io.grpc.InternalInstrumented;
62
import io.grpc.InternalLogId;
63
import io.grpc.InternalWithLogId;
64
import io.grpc.LoadBalancer;
65
import io.grpc.LoadBalancer.CreateSubchannelArgs;
66
import io.grpc.LoadBalancer.PickResult;
67
import io.grpc.LoadBalancer.PickSubchannelArgs;
68
import io.grpc.LoadBalancer.ResolvedAddresses;
69
import io.grpc.LoadBalancer.SubchannelPicker;
70
import io.grpc.LoadBalancer.SubchannelStateListener;
71
import io.grpc.ManagedChannel;
72
import io.grpc.ManagedChannelBuilder;
73
import io.grpc.Metadata;
74
import io.grpc.MethodDescriptor;
75
import io.grpc.MetricInstrumentRegistry;
76
import io.grpc.MetricRecorder;
77
import io.grpc.NameResolver;
78
import io.grpc.NameResolver.ConfigOrError;
79
import io.grpc.NameResolver.ResolutionResult;
80
import io.grpc.NameResolverProvider;
81
import io.grpc.NameResolverRegistry;
82
import io.grpc.ProxyDetector;
83
import io.grpc.Status;
84
import io.grpc.SynchronizationContext;
85
import io.grpc.SynchronizationContext.ScheduledHandle;
86
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
87
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
88
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
89
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
90
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
91
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
92
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
93
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
94
import io.grpc.internal.RetriableStream.Throttle;
95
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
96
import java.net.URI;
97
import java.util.ArrayList;
98
import java.util.Collection;
99
import java.util.Collections;
100
import java.util.HashSet;
101
import java.util.LinkedHashSet;
102
import java.util.List;
103
import java.util.Map;
104
import java.util.Set;
105
import java.util.concurrent.Callable;
106
import java.util.concurrent.CountDownLatch;
107
import java.util.concurrent.ExecutionException;
108
import java.util.concurrent.Executor;
109
import java.util.concurrent.Future;
110
import java.util.concurrent.ScheduledExecutorService;
111
import java.util.concurrent.ScheduledFuture;
112
import java.util.concurrent.TimeUnit;
113
import java.util.concurrent.TimeoutException;
114
import java.util.concurrent.atomic.AtomicBoolean;
115
import java.util.concurrent.atomic.AtomicReference;
116
import java.util.logging.Level;
117
import java.util.logging.Logger;
118
import javax.annotation.Nullable;
119
import javax.annotation.concurrent.GuardedBy;
120
import javax.annotation.concurrent.ThreadSafe;
121

122
/** A communication channel for making outgoing RPCs. */
123
@ThreadSafe
124
final class ManagedChannelImpl extends ManagedChannel implements
125
    InternalInstrumented<ChannelStats> {
126
  @VisibleForTesting
127
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
128

129
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
130

131
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
132

133
  @VisibleForTesting
134
  static final Status SHUTDOWN_NOW_STATUS =
1✔
135
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
136

137
  @VisibleForTesting
138
  static final Status SHUTDOWN_STATUS =
1✔
139
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
140

141
  @VisibleForTesting
142
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
143
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
144

145
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
146
      ManagedChannelServiceConfig.empty();
1✔
147
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
148
      new InternalConfigSelector() {
1✔
149
        @Override
150
        public Result selectConfig(PickSubchannelArgs args) {
151
          throw new IllegalStateException("Resolution is pending");
×
152
        }
153
      };
154
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
155
      new LoadBalancer.PickDetailsConsumer() {};
1✔
156

157
  private final InternalLogId logId;
158
  private final String target;
159
  @Nullable
160
  private final String authorityOverride;
161
  private final NameResolverRegistry nameResolverRegistry;
162
  private final URI targetUri;
163
  private final NameResolverProvider nameResolverProvider;
164
  private final NameResolver.Args nameResolverArgs;
165
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
166
  private final ClientTransportFactory originalTransportFactory;
167
  @Nullable
168
  private final ChannelCredentials originalChannelCreds;
169
  private final ClientTransportFactory transportFactory;
170
  private final ClientTransportFactory oobTransportFactory;
171
  private final RestrictedScheduledExecutor scheduledExecutor;
172
  private final Executor executor;
173
  private final ObjectPool<? extends Executor> executorPool;
174
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
175
  private final ExecutorHolder balancerRpcExecutorHolder;
176
  private final ExecutorHolder offloadExecutorHolder;
177
  private final TimeProvider timeProvider;
178
  private final int maxTraceEvents;
179

180
  @VisibleForTesting
1✔
181
  final SynchronizationContext syncContext = new SynchronizationContext(
182
      new Thread.UncaughtExceptionHandler() {
1✔
183
        @Override
184
        public void uncaughtException(Thread t, Throwable e) {
185
          logger.log(
1✔
186
              Level.SEVERE,
187
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
188
              e);
189
          panic(e);
1✔
190
        }
1✔
191
      });
192

193
  private boolean fullStreamDecompression;
194

195
  private final DecompressorRegistry decompressorRegistry;
196
  private final CompressorRegistry compressorRegistry;
197

198
  private final Supplier<Stopwatch> stopwatchSupplier;
199
  /** The timout before entering idle mode. */
200
  private final long idleTimeoutMillis;
201

202
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
203
  private final BackoffPolicy.Provider backoffPolicyProvider;
204

205
  /**
206
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
207
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
208
   * {@link RealChannel}.
209
   */
210
  private final Channel interceptorChannel;
211

212
  private final List<ClientTransportFilter> transportFilters;
213
  @Nullable private final String userAgent;
214

215
  // Only null after channel is terminated. Must be assigned from the syncContext.
216
  private NameResolver nameResolver;
217

218
  // Must be accessed from the syncContext.
219
  private boolean nameResolverStarted;
220

221
  // null when channel is in idle mode.  Must be assigned from syncContext.
222
  @Nullable
223
  private LbHelperImpl lbHelper;
224

225
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
226
  // null if channel is in idle mode.
227
  @Nullable
228
  private volatile SubchannelPicker subchannelPicker;
229

230
  // Must be accessed from the syncContext
231
  private boolean panicMode;
232

233
  // Must be mutated from syncContext
234
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
235
  // switch to a ConcurrentHashMap.
236
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
237

238
  // Must be accessed from syncContext
239
  @Nullable
240
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
241
  private final Object pendingCallsInUseObject = new Object();
1✔
242

243
  // Must be mutated from syncContext
244
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
245

246
  // reprocess() must be run from syncContext
247
  private final DelayedClientTransport delayedTransport;
248
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
249
      = new UncommittedRetriableStreamsRegistry();
250

251
  // Shutdown states.
252
  //
253
  // Channel's shutdown process:
254
  // 1. shutdown(): stop accepting new calls from applications
255
  //   1a shutdown <- true
256
  //   1b subchannelPicker <- null
257
  //   1c delayedTransport.shutdown()
258
  // 2. delayedTransport terminated: stop stream-creation functionality
259
  //   2a terminating <- true
260
  //   2b loadBalancer.shutdown()
261
  //     * LoadBalancer will shutdown subchannels and OOB channels
262
  //   2c loadBalancer <- null
263
  //   2d nameResolver.shutdown()
264
  //   2e nameResolver <- null
265
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
266

267
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
268
  // Must only be mutated and read from syncContext
269
  private boolean shutdownNowed;
270
  // Must only be mutated from syncContext
271
  private boolean terminating;
272
  // Must be mutated from syncContext
273
  private volatile boolean terminated;
274
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
275

276
  private final CallTracer.Factory callTracerFactory;
277
  private final CallTracer channelCallTracer;
278
  private final ChannelTracer channelTracer;
279
  private final ChannelLogger channelLogger;
280
  private final InternalChannelz channelz;
281
  private final RealChannel realChannel;
282
  // Must be mutated and read from syncContext
283
  // a flag for doing channel tracing when flipped
284
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
285
  // Must be mutated and read from constructor or syncContext
286
  // used for channel tracing when value changed
287
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
288

289
  @Nullable
290
  private final ManagedChannelServiceConfig defaultServiceConfig;
291
  // Must be mutated and read from constructor or syncContext
292
  private boolean serviceConfigUpdated = false;
1✔
293
  private final boolean lookUpServiceConfig;
294

295
  // One instance per channel.
296
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
297

298
  private final long perRpcBufferLimit;
299
  private final long channelBufferLimit;
300

301
  // Temporary false flag that can skip the retry code path.
302
  private final boolean retryEnabled;
303

304
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
305

306
  // Called from syncContext
307
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
308
      new DelayedTransportListener();
309

310
  // Must be called from syncContext
311
  private void maybeShutdownNowSubchannels() {
312
    if (shutdownNowed) {
1✔
313
      for (InternalSubchannel subchannel : subchannels) {
1✔
314
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
315
      }
1✔
316
      for (OobChannel oobChannel : oobChannels) {
1✔
317
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
318
      }
1✔
319
    }
320
  }
1✔
321

322
  // Must be accessed from syncContext
323
  @VisibleForTesting
1✔
324
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
325

326
  @Override
327
  public ListenableFuture<ChannelStats> getStats() {
328
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
329
    final class StatsFetcher implements Runnable {
1✔
330
      @Override
331
      public void run() {
332
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
333
        channelCallTracer.updateBuilder(builder);
1✔
334
        channelTracer.updateBuilder(builder);
1✔
335
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
336
        List<InternalWithLogId> children = new ArrayList<>();
1✔
337
        children.addAll(subchannels);
1✔
338
        children.addAll(oobChannels);
1✔
339
        builder.setSubchannels(children);
1✔
340
        ret.set(builder.build());
1✔
341
      }
1✔
342
    }
343

344
    // subchannels and oobchannels can only be accessed from syncContext
345
    syncContext.execute(new StatsFetcher());
1✔
346
    return ret;
1✔
347
  }
348

349
  @Override
350
  public InternalLogId getLogId() {
351
    return logId;
1✔
352
  }
353

354
  // Run from syncContext
355
  private class IdleModeTimer implements Runnable {
1✔
356

357
    @Override
358
    public void run() {
359
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
360
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
361
      // subtle to change rapidly to resolve the channel panic. See #8714
362
      if (lbHelper == null) {
1✔
363
        return;
×
364
      }
365
      enterIdleMode();
1✔
366
    }
1✔
367
  }
368

369
  // Must be called from syncContext
370
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
371
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
372
    if (channelIsActive) {
1✔
373
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
374
      checkState(lbHelper != null, "lbHelper is null");
1✔
375
    }
376
    if (nameResolver != null) {
1✔
377
      nameResolver.shutdown();
1✔
378
      nameResolverStarted = false;
1✔
379
      if (channelIsActive) {
1✔
380
        nameResolver = getNameResolver(
1✔
381
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
382
      } else {
383
        nameResolver = null;
1✔
384
      }
385
    }
386
    if (lbHelper != null) {
1✔
387
      lbHelper.lb.shutdown();
1✔
388
      lbHelper = null;
1✔
389
    }
390
    subchannelPicker = null;
1✔
391
  }
1✔
392

393
  /**
394
   * Make the channel exit idle mode, if it's in it.
395
   *
396
   * <p>Must be called from syncContext
397
   */
398
  @VisibleForTesting
399
  void exitIdleMode() {
400
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
401
    if (shutdown.get() || panicMode) {
1✔
402
      return;
1✔
403
    }
404
    if (inUseStateAggregator.isInUse()) {
1✔
405
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
406
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
407
      cancelIdleTimer(false);
1✔
408
    } else {
409
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
410
      // isInUse() == false, in which case we still need to schedule the timer.
411
      rescheduleIdleTimer();
1✔
412
    }
413
    if (lbHelper != null) {
1✔
414
      return;
1✔
415
    }
416
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
417
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
418
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
419
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
420
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
421
    this.lbHelper = lbHelper;
1✔
422

423
    channelStateManager.gotoState(CONNECTING);
1✔
424
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
425
    nameResolver.start(listener);
1✔
426
    nameResolverStarted = true;
1✔
427
  }
1✔
428

429
  // Must be run from syncContext
430
  private void enterIdleMode() {
431
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
432
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
433
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
434
    // which are bugs.
435
    shutdownNameResolverAndLoadBalancer(true);
1✔
436
    delayedTransport.reprocess(null);
1✔
437
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
438
    channelStateManager.gotoState(IDLE);
1✔
439
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
440
    // transport to be holding some we need to exit idle mode to give these calls a chance to
441
    // be processed.
442
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
443
      exitIdleMode();
1✔
444
    }
445
  }
1✔
446

447
  // Must be run from syncContext
448
  private void cancelIdleTimer(boolean permanent) {
449
    idleTimer.cancel(permanent);
1✔
450
  }
1✔
451

452
  // Always run from syncContext
453
  private void rescheduleIdleTimer() {
454
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
455
      return;
1✔
456
    }
457
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
458
  }
1✔
459

460
  /**
461
   * Force name resolution refresh to happen immediately. Must be run
462
   * from syncContext.
463
   */
464
  private void refreshNameResolution() {
465
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
466
    if (nameResolverStarted) {
1✔
467
      nameResolver.refresh();
1✔
468
    }
469
  }
1✔
470

471
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
472
    volatile Throttle throttle;
473

474
    private ClientTransport getTransport(PickSubchannelArgs args) {
475
      SubchannelPicker pickerCopy = subchannelPicker;
1✔
476
      if (shutdown.get()) {
1✔
477
        // If channel is shut down, delayedTransport is also shut down which will fail the stream
478
        // properly.
479
        return delayedTransport;
1✔
480
      }
481
      if (pickerCopy == null) {
1✔
482
        final class ExitIdleModeForTransport implements Runnable {
1✔
483
          @Override
484
          public void run() {
485
            exitIdleMode();
1✔
486
          }
1✔
487
        }
488

489
        syncContext.execute(new ExitIdleModeForTransport());
1✔
490
        return delayedTransport;
1✔
491
      }
492
      // There is no need to reschedule the idle timer here.
493
      //
494
      // pickerCopy != null, which means idle timer has not expired when this method starts.
495
      // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
496
      // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
497
      // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
498
      //
499
      // In most cases the idle timer is scheduled to fire after the transport has created the
500
      // stream, which would have reported in-use state to the channel that would have cancelled
501
      // the idle timer.
502
      PickResult pickResult = pickerCopy.pickSubchannel(args);
1✔
503
      ClientTransport transport = GrpcUtil.getTransportFromPickResult(
1✔
504
          pickResult, args.getCallOptions().isWaitForReady());
1✔
505
      if (transport != null) {
1✔
506
        return transport;
1✔
507
      }
508
      return delayedTransport;
1✔
509
    }
510

511
    @Override
512
    public ClientStream newStream(
513
        final MethodDescriptor<?, ?> method,
514
        final CallOptions callOptions,
515
        final Metadata headers,
516
        final Context context) {
517
      if (!retryEnabled) {
1✔
518
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
519
            callOptions, headers, 0, /* isTransparentRetry= */ false);
520
        ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
1✔
521
            method, headers, callOptions, new PickDetailsConsumerImpl(tracers)));
522
        Context origContext = context.attach();
1✔
523
        try {
524
          return transport.newStream(method, headers, callOptions, tracers);
1✔
525
        } finally {
526
          context.detach(origContext);
1✔
527
        }
528
      } else {
529
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
530
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
531
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
532
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
533
          @SuppressWarnings("unchecked")
534
          RetryStream() {
1✔
535
            super(
1✔
536
                (MethodDescriptor<ReqT, ?>) method,
537
                headers,
538
                channelBufferUsed,
1✔
539
                perRpcBufferLimit,
1✔
540
                channelBufferLimit,
1✔
541
                getCallExecutor(callOptions),
1✔
542
                transportFactory.getScheduledExecutorService(),
1✔
543
                retryPolicy,
544
                hedgingPolicy,
545
                throttle);
546
          }
1✔
547

548
          @Override
549
          Status prestart() {
550
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
551
          }
552

553
          @Override
554
          void postCommit() {
555
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
556
          }
1✔
557

558
          @Override
559
          ClientStream newSubstream(
560
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
561
              boolean isTransparentRetry) {
562
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
563
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
564
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
565
            ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
1✔
566
                method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers)));
567
            Context origContext = context.attach();
1✔
568
            try {
569
              return transport.newStream(method, newHeaders, newOptions, tracers);
1✔
570
            } finally {
571
              context.detach(origContext);
1✔
572
            }
573
          }
574
        }
575

576
        return new RetryStream<>();
1✔
577
      }
578
    }
579
  }
580

581
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
582

583
  private final Rescheduler idleTimer;
584
  private final MetricRecorder metricRecorder;
585

586
  ManagedChannelImpl(
587
      ManagedChannelImplBuilder builder,
588
      ClientTransportFactory clientTransportFactory,
589
      URI targetUri,
590
      NameResolverProvider nameResolverProvider,
591
      BackoffPolicy.Provider backoffPolicyProvider,
592
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
593
      Supplier<Stopwatch> stopwatchSupplier,
594
      List<ClientInterceptor> interceptors,
595
      final TimeProvider timeProvider) {
1✔
596
    this.target = checkNotNull(builder.target, "target");
1✔
597
    this.logId = InternalLogId.allocate("Channel", target);
1✔
598
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
599
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
600
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
601
    this.originalChannelCreds = builder.channelCredentials;
1✔
602
    this.originalTransportFactory = clientTransportFactory;
1✔
603
    this.offloadExecutorHolder =
1✔
604
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
605
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
606
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
607
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
608
        clientTransportFactory, null, this.offloadExecutorHolder);
609
    this.scheduledExecutor =
1✔
610
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
611
    maxTraceEvents = builder.maxTraceEvents;
1✔
612
    channelTracer = new ChannelTracer(
1✔
613
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
614
        "Channel for '" + target + "'");
615
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
616
    ProxyDetector proxyDetector =
617
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
618
    this.retryEnabled = builder.retryEnabled;
1✔
619
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
620
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
621
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
622
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
623
    ScParser serviceConfigParser =
1✔
624
        new ScParser(
625
            retryEnabled,
626
            builder.maxRetryAttempts,
627
            builder.maxHedgedAttempts,
628
            loadBalancerFactory);
629
    this.authorityOverride = builder.authorityOverride;
1✔
630
    this.nameResolverArgs =
1✔
631
        NameResolver.Args.newBuilder()
1✔
632
            .setDefaultPort(builder.getDefaultPort())
1✔
633
            .setProxyDetector(proxyDetector)
1✔
634
            .setSynchronizationContext(syncContext)
1✔
635
            .setScheduledExecutorService(scheduledExecutor)
1✔
636
            .setServiceConfigParser(serviceConfigParser)
1✔
637
            .setChannelLogger(channelLogger)
1✔
638
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
639
            .setOverrideAuthority(this.authorityOverride)
1✔
640
            .build();
1✔
641
    this.nameResolver = getNameResolver(
1✔
642
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
643
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
644
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
645
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
646
    this.delayedTransport.start(delayedTransportListener);
1✔
647
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
648

649
    if (builder.defaultServiceConfig != null) {
1✔
650
      ConfigOrError parsedDefaultServiceConfig =
1✔
651
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
652
      checkState(
1✔
653
          parsedDefaultServiceConfig.getError() == null,
1✔
654
          "Default config is invalid: %s",
655
          parsedDefaultServiceConfig.getError());
1✔
656
      this.defaultServiceConfig =
1✔
657
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
658
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
659
    } else {
1✔
660
      this.defaultServiceConfig = null;
1✔
661
    }
662
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
663
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
664
    Channel channel = realChannel;
1✔
665
    if (builder.binlog != null) {
1✔
666
      channel = builder.binlog.wrapChannel(channel);
1✔
667
    }
668
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
669
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
670
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
671
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
672
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
673
    } else {
674
      checkArgument(
1✔
675
          builder.idleTimeoutMillis
676
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
677
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
678
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
679
    }
680

681
    idleTimer = new Rescheduler(
1✔
682
        new IdleModeTimer(),
683
        syncContext,
684
        transportFactory.getScheduledExecutorService(),
1✔
685
        stopwatchSupplier.get());
1✔
686
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
687
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
688
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
689
    this.userAgent = builder.userAgent;
1✔
690

691
    this.channelBufferLimit = builder.retryBufferSize;
1✔
692
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
693
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
694
      @Override
695
      public CallTracer create() {
696
        return new CallTracer(timeProvider);
1✔
697
      }
698
    }
699

700
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
701
    channelCallTracer = callTracerFactory.create();
1✔
702
    this.channelz = checkNotNull(builder.channelz);
1✔
703
    channelz.addRootChannel(this);
1✔
704

705
    if (!lookUpServiceConfig) {
1✔
706
      if (defaultServiceConfig != null) {
1✔
707
        channelLogger.log(
1✔
708
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
709
      }
710
      serviceConfigUpdated = true;
1✔
711
    }
712
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
713
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
714
  }
1✔
715

716
  @VisibleForTesting
717
  static NameResolver getNameResolver(
718
      URI targetUri, @Nullable final String overrideAuthority,
719
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
720
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
721
    if (resolver == null) {
1✔
722
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
723
    }
724

725
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
726
    // TODO: After a transition period, all NameResolver implementations that need retry should use
727
    //       RetryingNameResolver directly and this step can be removed.
728
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
729
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
730
              nameResolverArgs.getScheduledExecutorService(),
1✔
731
              nameResolverArgs.getSynchronizationContext()),
1✔
732
          nameResolverArgs.getSynchronizationContext());
1✔
733

734
    if (overrideAuthority == null) {
1✔
735
      return usedNameResolver;
1✔
736
    }
737

738
    return new ForwardingNameResolver(usedNameResolver) {
1✔
739
      @Override
740
      public String getServiceAuthority() {
741
        return overrideAuthority;
1✔
742
      }
743
    };
744
  }
745

746
  @VisibleForTesting
747
  InternalConfigSelector getConfigSelector() {
748
    return realChannel.configSelector.get();
1✔
749
  }
750

751
  /**
752
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
753
   * cancelled.
754
   */
755
  @Override
756
  public ManagedChannelImpl shutdown() {
757
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
758
    if (!shutdown.compareAndSet(false, true)) {
1✔
759
      return this;
1✔
760
    }
761
    final class Shutdown implements Runnable {
1✔
762
      @Override
763
      public void run() {
764
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
765
        channelStateManager.gotoState(SHUTDOWN);
1✔
766
      }
1✔
767
    }
768

769
    syncContext.execute(new Shutdown());
1✔
770
    realChannel.shutdown();
1✔
771
    final class CancelIdleTimer implements Runnable {
1✔
772
      @Override
773
      public void run() {
774
        cancelIdleTimer(/* permanent= */ true);
1✔
775
      }
1✔
776
    }
777

778
    syncContext.execute(new CancelIdleTimer());
1✔
779
    return this;
1✔
780
  }
781

782
  /**
783
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
784
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
785
   * return {@code false} immediately after this method returns.
786
   */
787
  @Override
788
  public ManagedChannelImpl shutdownNow() {
789
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
790
    shutdown();
1✔
791
    realChannel.shutdownNow();
1✔
792
    final class ShutdownNow implements Runnable {
1✔
793
      @Override
794
      public void run() {
795
        if (shutdownNowed) {
1✔
796
          return;
1✔
797
        }
798
        shutdownNowed = true;
1✔
799
        maybeShutdownNowSubchannels();
1✔
800
      }
1✔
801
    }
802

803
    syncContext.execute(new ShutdownNow());
1✔
804
    return this;
1✔
805
  }
806

807
  // Called from syncContext
808
  @VisibleForTesting
809
  void panic(final Throwable t) {
810
    if (panicMode) {
1✔
811
      // Preserve the first panic information
812
      return;
×
813
    }
814
    panicMode = true;
1✔
815
    cancelIdleTimer(/* permanent= */ true);
1✔
816
    shutdownNameResolverAndLoadBalancer(false);
1✔
817
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
818
      private final PickResult panicPickResult =
1✔
819
          PickResult.withDrop(
1✔
820
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
821

822
      @Override
823
      public PickResult pickSubchannel(PickSubchannelArgs args) {
824
        return panicPickResult;
1✔
825
      }
826

827
      @Override
828
      public String toString() {
829
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
830
            .add("panicPickResult", panicPickResult)
×
831
            .toString();
×
832
      }
833
    }
834

835
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
836
    realChannel.updateConfigSelector(null);
1✔
837
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
838
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
839
  }
1✔
840

841
  @VisibleForTesting
842
  boolean isInPanicMode() {
843
    return panicMode;
1✔
844
  }
845

846
  // Called from syncContext
847
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
848
    subchannelPicker = newPicker;
1✔
849
    delayedTransport.reprocess(newPicker);
1✔
850
  }
1✔
851

852
  @Override
853
  public boolean isShutdown() {
854
    return shutdown.get();
1✔
855
  }
856

857
  @Override
858
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
859
    return terminatedLatch.await(timeout, unit);
1✔
860
  }
861

862
  @Override
863
  public boolean isTerminated() {
864
    return terminated;
1✔
865
  }
866

867
  /*
868
   * Creates a new outgoing call on the channel.
869
   */
870
  @Override
871
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
872
      CallOptions callOptions) {
873
    return interceptorChannel.newCall(method, callOptions);
1✔
874
  }
875

876
  @Override
877
  public String authority() {
878
    return interceptorChannel.authority();
1✔
879
  }
880

881
  private Executor getCallExecutor(CallOptions callOptions) {
882
    Executor executor = callOptions.getExecutor();
1✔
883
    if (executor == null) {
1✔
884
      executor = this.executor;
1✔
885
    }
886
    return executor;
1✔
887
  }
888

889
  private class RealChannel extends Channel {
890
    // Reference to null if no config selector is available from resolution result
891
    // Reference must be set() from syncContext
892
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
893
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
894
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
895
    // same target, the new instance must have the same value.
896
    private final String authority;
897

898
    private final Channel clientCallImplChannel = new Channel() {
1✔
899
      @Override
900
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
901
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
902
        return new ClientCallImpl<>(
1✔
903
            method,
904
            getCallExecutor(callOptions),
1✔
905
            callOptions,
906
            transportProvider,
1✔
907
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
908
            channelCallTracer,
1✔
909
            null)
910
            .setFullStreamDecompression(fullStreamDecompression)
1✔
911
            .setDecompressorRegistry(decompressorRegistry)
1✔
912
            .setCompressorRegistry(compressorRegistry);
1✔
913
      }
914

915
      @Override
916
      public String authority() {
917
        return authority;
×
918
      }
919
    };
920

921
    private RealChannel(String authority) {
1✔
922
      this.authority =  checkNotNull(authority, "authority");
1✔
923
    }
1✔
924

925
    @Override
926
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
927
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
928
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
929
        return newClientCall(method, callOptions);
1✔
930
      }
931
      syncContext.execute(new Runnable() {
1✔
932
        @Override
933
        public void run() {
934
          exitIdleMode();
1✔
935
        }
1✔
936
      });
937
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
938
        // This is an optimization for the case (typically with InProcessTransport) when name
939
        // resolution result is immediately available at this point. Otherwise, some users'
940
        // tests might observe slight behavior difference from earlier grpc versions.
941
        return newClientCall(method, callOptions);
1✔
942
      }
943
      if (shutdown.get()) {
1✔
944
        // Return a failing ClientCall.
945
        return new ClientCall<ReqT, RespT>() {
×
946
          @Override
947
          public void start(Listener<RespT> responseListener, Metadata headers) {
948
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
949
          }
×
950

951
          @Override public void request(int numMessages) {}
×
952

953
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
954

955
          @Override public void halfClose() {}
×
956

957
          @Override public void sendMessage(ReqT message) {}
×
958
        };
959
      }
960
      Context context = Context.current();
1✔
961
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
962
      syncContext.execute(new Runnable() {
1✔
963
        @Override
964
        public void run() {
965
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
966
            if (pendingCalls == null) {
1✔
967
              pendingCalls = new LinkedHashSet<>();
1✔
968
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
969
            }
970
            pendingCalls.add(pendingCall);
1✔
971
          } else {
972
            pendingCall.reprocess();
1✔
973
          }
974
        }
1✔
975
      });
976
      return pendingCall;
1✔
977
    }
978

979
    // Must run in SynchronizationContext.
980
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
981
      InternalConfigSelector prevConfig = configSelector.get();
1✔
982
      configSelector.set(config);
1✔
983
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
984
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
985
          pendingCall.reprocess();
1✔
986
        }
1✔
987
      }
988
    }
1✔
989

990
    // Must run in SynchronizationContext.
991
    void onConfigError() {
992
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
993
        updateConfigSelector(null);
1✔
994
      }
995
    }
1✔
996

997
    void shutdown() {
998
      final class RealChannelShutdown implements Runnable {
1✔
999
        @Override
1000
        public void run() {
1001
          if (pendingCalls == null) {
1✔
1002
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1003
              configSelector.set(null);
1✔
1004
            }
1005
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1006
          }
1007
        }
1✔
1008
      }
1009

1010
      syncContext.execute(new RealChannelShutdown());
1✔
1011
    }
1✔
1012

1013
    void shutdownNow() {
1014
      final class RealChannelShutdownNow implements Runnable {
1✔
1015
        @Override
1016
        public void run() {
1017
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1018
            configSelector.set(null);
1✔
1019
          }
1020
          if (pendingCalls != null) {
1✔
1021
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1022
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
1023
            }
1✔
1024
          }
1025
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
1026
        }
1✔
1027
      }
1028

1029
      syncContext.execute(new RealChannelShutdownNow());
1✔
1030
    }
1✔
1031

1032
    @Override
1033
    public String authority() {
1034
      return authority;
1✔
1035
    }
1036

1037
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1038
      final Context context;
1039
      final MethodDescriptor<ReqT, RespT> method;
1040
      final CallOptions callOptions;
1041
      private final long callCreationTime;
1042

1043
      PendingCall(
1044
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1045
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1046
        this.context = context;
1✔
1047
        this.method = method;
1✔
1048
        this.callOptions = callOptions;
1✔
1049
        this.callCreationTime = ticker.nanoTime();
1✔
1050
      }
1✔
1051

1052
      /** Called when it's ready to create a real call and reprocess the pending call. */
1053
      void reprocess() {
1054
        ClientCall<ReqT, RespT> realCall;
1055
        Context previous = context.attach();
1✔
1056
        try {
1057
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1058
              ticker.nanoTime() - callCreationTime);
1✔
1059
          realCall = newClientCall(method, delayResolutionOption);
1✔
1060
        } finally {
1061
          context.detach(previous);
1✔
1062
        }
1063
        Runnable toRun = setCall(realCall);
1✔
1064
        if (toRun == null) {
1✔
1065
          syncContext.execute(new PendingCallRemoval());
1✔
1066
        } else {
1067
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1068
            @Override
1069
            public void run() {
1070
              toRun.run();
1✔
1071
              syncContext.execute(new PendingCallRemoval());
1✔
1072
            }
1✔
1073
          });
1074
        }
1075
      }
1✔
1076

1077
      @Override
1078
      protected void callCancelled() {
1079
        super.callCancelled();
1✔
1080
        syncContext.execute(new PendingCallRemoval());
1✔
1081
      }
1✔
1082

1083
      final class PendingCallRemoval implements Runnable {
1✔
1084
        @Override
1085
        public void run() {
1086
          if (pendingCalls != null) {
1✔
1087
            pendingCalls.remove(PendingCall.this);
1✔
1088
            if (pendingCalls.isEmpty()) {
1✔
1089
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1090
              pendingCalls = null;
1✔
1091
              if (shutdown.get()) {
1✔
1092
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1093
              }
1094
            }
1095
          }
1096
        }
1✔
1097
      }
1098
    }
1099

1100
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1101
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1102
      InternalConfigSelector selector = configSelector.get();
1✔
1103
      if (selector == null) {
1✔
1104
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1105
      }
1106
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1107
        MethodInfo methodInfo =
1✔
1108
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1109
        if (methodInfo != null) {
1✔
1110
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1111
        }
1112
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1113
      }
1114
      return new ConfigSelectingClientCall<>(
1✔
1115
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1116
    }
1117
  }
1118

1119
  /**
1120
   * A client call for a given channel that applies a given config selector when it starts.
1121
   */
1122
  static final class ConfigSelectingClientCall<ReqT, RespT>
1123
      extends ForwardingClientCall<ReqT, RespT> {
1124

1125
    private final InternalConfigSelector configSelector;
1126
    private final Channel channel;
1127
    private final Executor callExecutor;
1128
    private final MethodDescriptor<ReqT, RespT> method;
1129
    private final Context context;
1130
    private CallOptions callOptions;
1131

1132
    private ClientCall<ReqT, RespT> delegate;
1133

1134
    ConfigSelectingClientCall(
1135
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1136
        MethodDescriptor<ReqT, RespT> method,
1137
        CallOptions callOptions) {
1✔
1138
      this.configSelector = configSelector;
1✔
1139
      this.channel = channel;
1✔
1140
      this.method = method;
1✔
1141
      this.callExecutor =
1✔
1142
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1143
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1144
      this.context = Context.current();
1✔
1145
    }
1✔
1146

1147
    @Override
1148
    protected ClientCall<ReqT, RespT> delegate() {
1149
      return delegate;
1✔
1150
    }
1151

1152
    @SuppressWarnings("unchecked")
1153
    @Override
1154
    public void start(Listener<RespT> observer, Metadata headers) {
1155
      PickSubchannelArgs args =
1✔
1156
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1157
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1158
      Status status = result.getStatus();
1✔
1159
      if (!status.isOk()) {
1✔
1160
        executeCloseObserverInContext(observer,
1✔
1161
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1162
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1163
        return;
1✔
1164
      }
1165
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1166
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1167
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1168
      if (methodInfo != null) {
1✔
1169
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1170
      }
1171
      if (interceptor != null) {
1✔
1172
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1173
      } else {
1174
        delegate = channel.newCall(method, callOptions);
×
1175
      }
1176
      delegate.start(observer, headers);
1✔
1177
    }
1✔
1178

1179
    private void executeCloseObserverInContext(
1180
        final Listener<RespT> observer, final Status status) {
1181
      class CloseInContext extends ContextRunnable {
1182
        CloseInContext() {
1✔
1183
          super(context);
1✔
1184
        }
1✔
1185

1186
        @Override
1187
        public void runInContext() {
1188
          observer.onClose(status, new Metadata());
1✔
1189
        }
1✔
1190
      }
1191

1192
      callExecutor.execute(new CloseInContext());
1✔
1193
    }
1✔
1194

1195
    @Override
1196
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1197
      if (delegate != null) {
×
1198
        delegate.cancel(message, cause);
×
1199
      }
1200
    }
×
1201
  }
1202

1203
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1204
    @Override
1205
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1206

1207
    @Override
1208
    public void request(int numMessages) {}
1✔
1209

1210
    @Override
1211
    public void cancel(String message, Throwable cause) {}
×
1212

1213
    @Override
1214
    public void halfClose() {}
×
1215

1216
    @Override
1217
    public void sendMessage(Object message) {}
×
1218

1219
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1220
    @Override
1221
    public boolean isReady() {
1222
      return false;
×
1223
    }
1224
  };
1225

1226
  /**
1227
   * Terminate the channel if termination conditions are met.
1228
   */
1229
  // Must be run from syncContext
1230
  private void maybeTerminateChannel() {
1231
    if (terminated) {
1✔
1232
      return;
×
1233
    }
1234
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1235
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1236
      channelz.removeRootChannel(this);
1✔
1237
      executorPool.returnObject(executor);
1✔
1238
      balancerRpcExecutorHolder.release();
1✔
1239
      offloadExecutorHolder.release();
1✔
1240
      // Release the transport factory so that it can deallocate any resources.
1241
      transportFactory.close();
1✔
1242

1243
      terminated = true;
1✔
1244
      terminatedLatch.countDown();
1✔
1245
    }
1246
  }
1✔
1247

1248
  // Must be called from syncContext
1249
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1250
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1251
      refreshNameResolution();
1✔
1252
    }
1253
  }
1✔
1254

1255
  @Override
1256
  @SuppressWarnings("deprecation")
1257
  public ConnectivityState getState(boolean requestConnection) {
1258
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1259
    if (requestConnection && savedChannelState == IDLE) {
1✔
1260
      final class RequestConnection implements Runnable {
1✔
1261
        @Override
1262
        public void run() {
1263
          exitIdleMode();
1✔
1264
          if (subchannelPicker != null) {
1✔
1265
            subchannelPicker.requestConnection();
1✔
1266
          }
1267
          if (lbHelper != null) {
1✔
1268
            lbHelper.lb.requestConnection();
1✔
1269
          }
1270
        }
1✔
1271
      }
1272

1273
      syncContext.execute(new RequestConnection());
1✔
1274
    }
1275
    return savedChannelState;
1✔
1276
  }
1277

1278
  @Override
1279
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1280
    final class NotifyStateChanged implements Runnable {
1✔
1281
      @Override
1282
      public void run() {
1283
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1284
      }
1✔
1285
    }
1286

1287
    syncContext.execute(new NotifyStateChanged());
1✔
1288
  }
1✔
1289

1290
  @Override
1291
  public void resetConnectBackoff() {
1292
    final class ResetConnectBackoff implements Runnable {
1✔
1293
      @Override
1294
      public void run() {
1295
        if (shutdown.get()) {
1✔
1296
          return;
1✔
1297
        }
1298
        if (nameResolverStarted) {
1✔
1299
          refreshNameResolution();
1✔
1300
        }
1301
        for (InternalSubchannel subchannel : subchannels) {
1✔
1302
          subchannel.resetConnectBackoff();
1✔
1303
        }
1✔
1304
        for (OobChannel oobChannel : oobChannels) {
1✔
1305
          oobChannel.resetConnectBackoff();
×
1306
        }
×
1307
      }
1✔
1308
    }
1309

1310
    syncContext.execute(new ResetConnectBackoff());
1✔
1311
  }
1✔
1312

1313
  @Override
1314
  public void enterIdle() {
1315
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1316
      @Override
1317
      public void run() {
1318
        if (shutdown.get() || lbHelper == null) {
1✔
1319
          return;
1✔
1320
        }
1321
        cancelIdleTimer(/* permanent= */ false);
1✔
1322
        enterIdleMode();
1✔
1323
      }
1✔
1324
    }
1325

1326
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1327
  }
1✔
1328

1329
  /**
1330
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1331
   * backoff.
1332
   */
1333
  private final class UncommittedRetriableStreamsRegistry {
1✔
1334
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1335
    // it's worthwhile to look for a lock-free approach.
1336
    final Object lock = new Object();
1✔
1337

1338
    @GuardedBy("lock")
1✔
1339
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1340

1341
    @GuardedBy("lock")
1342
    Status shutdownStatus;
1343

1344
    void onShutdown(Status reason) {
1345
      boolean shouldShutdownDelayedTransport = false;
1✔
1346
      synchronized (lock) {
1✔
1347
        if (shutdownStatus != null) {
1✔
1348
          return;
1✔
1349
        }
1350
        shutdownStatus = reason;
1✔
1351
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1352
        // retriable streams, which may be in backoff and not using any transport, are already
1353
        // started RPCs.
1354
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1355
          shouldShutdownDelayedTransport = true;
1✔
1356
        }
1357
      }
1✔
1358

1359
      if (shouldShutdownDelayedTransport) {
1✔
1360
        delayedTransport.shutdown(reason);
1✔
1361
      }
1362
    }
1✔
1363

1364
    void onShutdownNow(Status reason) {
1365
      onShutdown(reason);
1✔
1366
      Collection<ClientStream> streams;
1367

1368
      synchronized (lock) {
1✔
1369
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1370
      }
1✔
1371

1372
      for (ClientStream stream : streams) {
1✔
1373
        stream.cancel(reason);
1✔
1374
      }
1✔
1375
      delayedTransport.shutdownNow(reason);
1✔
1376
    }
1✔
1377

1378
    /**
1379
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1380
     * shutdown Status.
1381
     */
1382
    @Nullable
1383
    Status add(RetriableStream<?> retriableStream) {
1384
      synchronized (lock) {
1✔
1385
        if (shutdownStatus != null) {
1✔
1386
          return shutdownStatus;
1✔
1387
        }
1388
        uncommittedRetriableStreams.add(retriableStream);
1✔
1389
        return null;
1✔
1390
      }
1391
    }
1392

1393
    void remove(RetriableStream<?> retriableStream) {
1394
      Status shutdownStatusCopy = null;
1✔
1395

1396
      synchronized (lock) {
1✔
1397
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1398
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1399
          shutdownStatusCopy = shutdownStatus;
1✔
1400
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1401
          // hashmap.
1402
          uncommittedRetriableStreams = new HashSet<>();
1✔
1403
        }
1404
      }
1✔
1405

1406
      if (shutdownStatusCopy != null) {
1✔
1407
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1408
      }
1409
    }
1✔
1410
  }
1411

1412
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1413
    AutoConfiguredLoadBalancer lb;
1414

1415
    @Override
1416
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1417
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1418
      // No new subchannel should be created after load balancer has been shutdown.
1419
      checkState(!terminating, "Channel is being terminated");
1✔
1420
      return new SubchannelImpl(args);
1✔
1421
    }
1422

1423
    @Override
1424
    public void updateBalancingState(
1425
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1426
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1427
      checkNotNull(newState, "newState");
1✔
1428
      checkNotNull(newPicker, "newPicker");
1✔
1429
      final class UpdateBalancingState implements Runnable {
1✔
1430
        @Override
1431
        public void run() {
1432
          if (LbHelperImpl.this != lbHelper) {
1✔
1433
            return;
1✔
1434
          }
1435
          updateSubchannelPicker(newPicker);
1✔
1436
          // It's not appropriate to report SHUTDOWN state from lb.
1437
          // Ignore the case of newState == SHUTDOWN for now.
1438
          if (newState != SHUTDOWN) {
1✔
1439
            channelLogger.log(
1✔
1440
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1441
            channelStateManager.gotoState(newState);
1✔
1442
          }
1443
        }
1✔
1444
      }
1445

1446
      syncContext.execute(new UpdateBalancingState());
1✔
1447
    }
1✔
1448

1449
    @Override
1450
    public void refreshNameResolution() {
1451
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1452
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1453
        @Override
1454
        public void run() {
1455
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1456
        }
1✔
1457
      }
1458

1459
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1460
    }
1✔
1461

1462
    @Override
1463
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1464
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1465
    }
1466

1467
    @Override
1468
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1469
        String authority) {
1470
      // TODO(ejona): can we be even stricter? Like terminating?
1471
      checkState(!terminated, "Channel is terminated");
1✔
1472
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1473
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1474
      InternalLogId subchannelLogId =
1✔
1475
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1476
      ChannelTracer oobChannelTracer =
1✔
1477
          new ChannelTracer(
1478
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1479
              "OobChannel for " + addressGroup);
1480
      final OobChannel oobChannel = new OobChannel(
1✔
1481
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1482
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1483
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1484
          .setDescription("Child OobChannel created")
1✔
1485
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1486
          .setTimestampNanos(oobChannelCreationTime)
1✔
1487
          .setChannelRef(oobChannel)
1✔
1488
          .build());
1✔
1489
      ChannelTracer subchannelTracer =
1✔
1490
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1491
              "Subchannel for " + addressGroup);
1492
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1493
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1494
        @Override
1495
        void onTerminated(InternalSubchannel is) {
1496
          oobChannels.remove(oobChannel);
1✔
1497
          channelz.removeSubchannel(is);
1✔
1498
          oobChannel.handleSubchannelTerminated();
1✔
1499
          maybeTerminateChannel();
1✔
1500
        }
1✔
1501

1502
        @Override
1503
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1504
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1505
          //  state and refresh name resolution if necessary.
1506
          handleInternalSubchannelState(newState);
1✔
1507
          oobChannel.handleSubchannelStateChange(newState);
1✔
1508
        }
1✔
1509
      }
1510

1511
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1512
          addressGroup,
1513
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1514
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1515
          // All callback methods are run from syncContext
1516
          new ManagedOobChannelCallback(),
1517
          channelz,
1✔
1518
          callTracerFactory.create(),
1✔
1519
          subchannelTracer,
1520
          subchannelLogId,
1521
          subchannelLogger,
1522
          transportFilters);
1✔
1523
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1524
          .setDescription("Child Subchannel created")
1✔
1525
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1526
          .setTimestampNanos(oobChannelCreationTime)
1✔
1527
          .setSubchannelRef(internalSubchannel)
1✔
1528
          .build());
1✔
1529
      channelz.addSubchannel(oobChannel);
1✔
1530
      channelz.addSubchannel(internalSubchannel);
1✔
1531
      oobChannel.setSubchannel(internalSubchannel);
1✔
1532
      final class AddOobChannel implements Runnable {
1✔
1533
        @Override
1534
        public void run() {
1535
          if (terminating) {
1✔
1536
            oobChannel.shutdown();
×
1537
          }
1538
          if (!terminated) {
1✔
1539
            // If channel has not terminated, it will track the subchannel and block termination
1540
            // for it.
1541
            oobChannels.add(oobChannel);
1✔
1542
          }
1543
        }
1✔
1544
      }
1545

1546
      syncContext.execute(new AddOobChannel());
1✔
1547
      return oobChannel;
1✔
1548
    }
1549

1550
    @Deprecated
1551
    @Override
1552
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1553
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1554
          // Override authority to keep the old behavior.
1555
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1556
          .overrideAuthority(getAuthority());
1✔
1557
    }
1558

1559
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1560
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1561
    @Override
1562
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1563
        final String target, final ChannelCredentials channelCreds) {
1564
      checkNotNull(channelCreds, "channelCreds");
1✔
1565

1566
      final class ResolvingOobChannelBuilder
1567
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1568
        final ManagedChannelBuilder<?> delegate;
1569

1570
        ResolvingOobChannelBuilder() {
1✔
1571
          final ClientTransportFactory transportFactory;
1572
          CallCredentials callCredentials;
1573
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1574
            transportFactory = originalTransportFactory;
1✔
1575
            callCredentials = null;
1✔
1576
          } else {
1577
            SwapChannelCredentialsResult swapResult =
1✔
1578
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1579
            if (swapResult == null) {
1✔
1580
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1581
              return;
×
1582
            } else {
1583
              transportFactory = swapResult.transportFactory;
1✔
1584
              callCredentials = swapResult.callCredentials;
1✔
1585
            }
1586
          }
1587
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1588
              new ClientTransportFactoryBuilder() {
1✔
1589
                @Override
1590
                public ClientTransportFactory buildClientTransportFactory() {
1591
                  return transportFactory;
1✔
1592
                }
1593
              };
1594
          delegate = new ManagedChannelImplBuilder(
1✔
1595
              target,
1596
              channelCreds,
1597
              callCredentials,
1598
              transportFactoryBuilder,
1599
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1600
              .nameResolverRegistry(nameResolverRegistry);
1✔
1601
        }
1✔
1602

1603
        @Override
1604
        protected ManagedChannelBuilder<?> delegate() {
1605
          return delegate;
1✔
1606
        }
1607
      }
1608

1609
      checkState(!terminated, "Channel is terminated");
1✔
1610

1611
      @SuppressWarnings("deprecation")
1612
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1613

1614
      return builder
1✔
1615
          // TODO(zdapeng): executors should not outlive the parent channel.
1616
          .executor(executor)
1✔
1617
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1618
          .maxTraceEvents(maxTraceEvents)
1✔
1619
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1620
          .userAgent(userAgent);
1✔
1621
    }
1622

1623
    @Override
1624
    public ChannelCredentials getUnsafeChannelCredentials() {
1625
      if (originalChannelCreds == null) {
×
1626
        return new DefaultChannelCreds();
×
1627
      }
1628
      return originalChannelCreds;
×
1629
    }
1630

1631
    @Override
1632
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1633
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1634
    }
×
1635

1636
    @Override
1637
    public void updateOobChannelAddresses(ManagedChannel channel,
1638
        List<EquivalentAddressGroup> eag) {
1639
      checkArgument(channel instanceof OobChannel,
1✔
1640
          "channel must have been returned from createOobChannel");
1641
      ((OobChannel) channel).updateAddresses(eag);
1✔
1642
    }
1✔
1643

1644
    @Override
1645
    public String getAuthority() {
1646
      return ManagedChannelImpl.this.authority();
1✔
1647
    }
1648

1649
    @Override
1650
    public String getChannelTarget() {
1651
      return targetUri.toString();
×
1652
    }
1653

1654
    @Override
1655
    public SynchronizationContext getSynchronizationContext() {
1656
      return syncContext;
1✔
1657
    }
1658

1659
    @Override
1660
    public ScheduledExecutorService getScheduledExecutorService() {
1661
      return scheduledExecutor;
1✔
1662
    }
1663

1664
    @Override
1665
    public ChannelLogger getChannelLogger() {
1666
      return channelLogger;
1✔
1667
    }
1668

1669
    @Override
1670
    public NameResolver.Args getNameResolverArgs() {
1671
      return nameResolverArgs;
1✔
1672
    }
1673

1674
    @Override
1675
    public NameResolverRegistry getNameResolverRegistry() {
1676
      return nameResolverRegistry;
1✔
1677
    }
1678

1679
    @Override
1680
    public MetricRecorder getMetricRecorder() {
1681
      return metricRecorder;
1✔
1682
    }
1683

1684
    /**
1685
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1686
     */
1687
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1688
    //     channel creds.
1689
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1690
      @Override
1691
      public ChannelCredentials withoutBearerTokens() {
1692
        return this;
×
1693
      }
1694
    }
1695
  }
1696

1697
  final class NameResolverListener extends NameResolver.Listener2 {
1698
    final LbHelperImpl helper;
1699
    final NameResolver resolver;
1700

1701
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1702
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1703
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1704
    }
1✔
1705

1706
    @Override
1707
    public void onResult(final ResolutionResult resolutionResult) {
1708
      final class NamesResolved implements Runnable {
1✔
1709

1710
        @SuppressWarnings("ReferenceEquality")
1711
        @Override
1712
        public void run() {
1713
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1714
            return;
1✔
1715
          }
1716

1717
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1718
          channelLogger.log(
1✔
1719
              ChannelLogLevel.DEBUG,
1720
              "Resolved address: {0}, config={1}",
1721
              servers,
1722
              resolutionResult.getAttributes());
1✔
1723

1724
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1725
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1726
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1727
          }
1728

1729
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1730
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1731
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1732
          InternalConfigSelector resolvedConfigSelector =
1✔
1733
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1734
          ManagedChannelServiceConfig validServiceConfig =
1735
              configOrError != null && configOrError.getConfig() != null
1✔
1736
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1737
                  : null;
1✔
1738
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1739

1740
          ManagedChannelServiceConfig effectiveServiceConfig;
1741
          if (!lookUpServiceConfig) {
1✔
1742
            if (validServiceConfig != null) {
1✔
1743
              channelLogger.log(
1✔
1744
                  ChannelLogLevel.INFO,
1745
                  "Service config from name resolver discarded by channel settings");
1746
            }
1747
            effectiveServiceConfig =
1748
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1749
            if (resolvedConfigSelector != null) {
1✔
1750
              channelLogger.log(
1✔
1751
                  ChannelLogLevel.INFO,
1752
                  "Config selector from name resolver discarded by channel settings");
1753
            }
1754
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1755
          } else {
1756
            // Try to use config if returned from name resolver
1757
            // Otherwise, try to use the default config if available
1758
            if (validServiceConfig != null) {
1✔
1759
              effectiveServiceConfig = validServiceConfig;
1✔
1760
              if (resolvedConfigSelector != null) {
1✔
1761
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1762
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1763
                  channelLogger.log(
×
1764
                      ChannelLogLevel.DEBUG,
1765
                      "Method configs in service config will be discarded due to presence of"
1766
                          + "config-selector");
1767
                }
1768
              } else {
1769
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1770
              }
1771
            } else if (defaultServiceConfig != null) {
1✔
1772
              effectiveServiceConfig = defaultServiceConfig;
1✔
1773
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1774
              channelLogger.log(
1✔
1775
                  ChannelLogLevel.INFO,
1776
                  "Received no service config, using default service config");
1777
            } else if (serviceConfigError != null) {
1✔
1778
              if (!serviceConfigUpdated) {
1✔
1779
                // First DNS lookup has invalid service config, and cannot fall back to default
1780
                channelLogger.log(
1✔
1781
                    ChannelLogLevel.INFO,
1782
                    "Fallback to error due to invalid first service config without default config");
1783
                // This error could be an "inappropriate" control plane error that should not bleed
1784
                // through to client code using gRPC. We let them flow through here to the LB as
1785
                // we later check for these error codes when investigating pick results in
1786
                // GrpcUtil.getTransportFromPickResult().
1787
                onError(configOrError.getError());
1✔
1788
                if (resolutionResultListener != null) {
1✔
1789
                  resolutionResultListener.resolutionAttempted(configOrError.getError());
1✔
1790
                }
1791
                return;
1✔
1792
              } else {
1793
                effectiveServiceConfig = lastServiceConfig;
1✔
1794
              }
1795
            } else {
1796
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1797
              realChannel.updateConfigSelector(null);
1✔
1798
            }
1799
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1800
              channelLogger.log(
1✔
1801
                  ChannelLogLevel.INFO,
1802
                  "Service config changed{0}",
1803
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1804
              lastServiceConfig = effectiveServiceConfig;
1✔
1805
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1806
            }
1807

1808
            try {
1809
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1810
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1811
              //  lbNeedAddress is not deterministic
1812
              serviceConfigUpdated = true;
1✔
1813
            } catch (RuntimeException re) {
×
1814
              logger.log(
×
1815
                  Level.WARNING,
1816
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1817
                  re);
1818
            }
1✔
1819
          }
1820

1821
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1822
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1823
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1824
            Attributes.Builder attrBuilder =
1✔
1825
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1826
            Map<String, ?> healthCheckingConfig =
1✔
1827
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1828
            if (healthCheckingConfig != null) {
1✔
1829
              attrBuilder
1✔
1830
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1831
                  .build();
1✔
1832
            }
1833
            Attributes attributes = attrBuilder.build();
1✔
1834

1835
            Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1836
                ResolvedAddresses.newBuilder()
1✔
1837
                    .setAddresses(servers)
1✔
1838
                    .setAttributes(attributes)
1✔
1839
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1840
                    .build());
1✔
1841
            // If a listener is provided, let it know if the addresses were accepted.
1842
            if (resolutionResultListener != null) {
1✔
1843
              resolutionResultListener.resolutionAttempted(addressAcceptanceStatus);
1✔
1844
            }
1845
          }
1846
        }
1✔
1847
      }
1848

1849
      syncContext.execute(new NamesResolved());
1✔
1850
    }
1✔
1851

1852
    @Override
1853
    public void onError(final Status error) {
1854
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1855
      final class NameResolverErrorHandler implements Runnable {
1✔
1856
        @Override
1857
        public void run() {
1858
          handleErrorInSyncContext(error);
1✔
1859
        }
1✔
1860
      }
1861

1862
      syncContext.execute(new NameResolverErrorHandler());
1✔
1863
    }
1✔
1864

1865
    private void handleErrorInSyncContext(Status error) {
1866
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1867
          new Object[] {getLogId(), error});
1✔
1868
      realChannel.onConfigError();
1✔
1869
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1870
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1871
        lastResolutionState = ResolutionState.ERROR;
1✔
1872
      }
1873
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1874
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1875
        return;
1✔
1876
      }
1877

1878
      helper.lb.handleNameResolutionError(error);
1✔
1879
    }
1✔
1880
  }
1881

1882
  private final class SubchannelImpl extends AbstractSubchannel {
1883
    final CreateSubchannelArgs args;
1884
    final InternalLogId subchannelLogId;
1885
    final ChannelLoggerImpl subchannelLogger;
1886
    final ChannelTracer subchannelTracer;
1887
    List<EquivalentAddressGroup> addressGroups;
1888
    InternalSubchannel subchannel;
1889
    boolean started;
1890
    boolean shutdown;
1891
    ScheduledHandle delayedShutdownTask;
1892

1893
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1894
      checkNotNull(args, "args");
1✔
1895
      addressGroups = args.getAddresses();
1✔
1896
      if (authorityOverride != null) {
1✔
1897
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1898
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1899
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1900
      }
1901
      this.args = args;
1✔
1902
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1903
      subchannelTracer = new ChannelTracer(
1✔
1904
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1905
          "Subchannel for " + args.getAddresses());
1✔
1906
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1907
    }
1✔
1908

1909
    @Override
1910
    public void start(final SubchannelStateListener listener) {
1911
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1912
      checkState(!started, "already started");
1✔
1913
      checkState(!shutdown, "already shutdown");
1✔
1914
      checkState(!terminating, "Channel is being terminated");
1✔
1915
      started = true;
1✔
1916
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1917
        // All callbacks are run in syncContext
1918
        @Override
1919
        void onTerminated(InternalSubchannel is) {
1920
          subchannels.remove(is);
1✔
1921
          channelz.removeSubchannel(is);
1✔
1922
          maybeTerminateChannel();
1✔
1923
        }
1✔
1924

1925
        @Override
1926
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1927
          checkState(listener != null, "listener is null");
1✔
1928
          listener.onSubchannelState(newState);
1✔
1929
        }
1✔
1930

1931
        @Override
1932
        void onInUse(InternalSubchannel is) {
1933
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1934
        }
1✔
1935

1936
        @Override
1937
        void onNotInUse(InternalSubchannel is) {
1938
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1939
        }
1✔
1940
      }
1941

1942
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1943
          args.getAddresses(),
1✔
1944
          authority(),
1✔
1945
          userAgent,
1✔
1946
          backoffPolicyProvider,
1✔
1947
          transportFactory,
1✔
1948
          transportFactory.getScheduledExecutorService(),
1✔
1949
          stopwatchSupplier,
1✔
1950
          syncContext,
1951
          new ManagedInternalSubchannelCallback(),
1952
          channelz,
1✔
1953
          callTracerFactory.create(),
1✔
1954
          subchannelTracer,
1955
          subchannelLogId,
1956
          subchannelLogger,
1957
          transportFilters);
1✔
1958

1959
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1960
          .setDescription("Child Subchannel started")
1✔
1961
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1962
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1963
          .setSubchannelRef(internalSubchannel)
1✔
1964
          .build());
1✔
1965

1966
      this.subchannel = internalSubchannel;
1✔
1967
      channelz.addSubchannel(internalSubchannel);
1✔
1968
      subchannels.add(internalSubchannel);
1✔
1969
    }
1✔
1970

1971
    @Override
1972
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1973
      checkState(started, "not started");
1✔
1974
      return subchannel;
1✔
1975
    }
1976

1977
    @Override
1978
    public void shutdown() {
1979
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1980
      if (subchannel == null) {
1✔
1981
        // start() was not successful
1982
        shutdown = true;
×
1983
        return;
×
1984
      }
1985
      if (shutdown) {
1✔
1986
        if (terminating && delayedShutdownTask != null) {
1✔
1987
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1988
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1989
          delayedShutdownTask.cancel();
×
1990
          delayedShutdownTask = null;
×
1991
          // Will fall through to the subchannel.shutdown() at the end.
1992
        } else {
1993
          return;
1✔
1994
        }
1995
      } else {
1996
        shutdown = true;
1✔
1997
      }
1998
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1999
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
2000
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
2001
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
2002
      // shutdown of Subchannel for a few seconds here.
2003
      //
2004
      // TODO(zhangkun83): consider a better approach
2005
      // (https://github.com/grpc/grpc-java/issues/2562).
2006
      if (!terminating) {
1✔
2007
        final class ShutdownSubchannel implements Runnable {
1✔
2008
          @Override
2009
          public void run() {
2010
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
2011
          }
1✔
2012
        }
2013

2014
        delayedShutdownTask = syncContext.schedule(
1✔
2015
            new LogExceptionRunnable(new ShutdownSubchannel()),
2016
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2017
            transportFactory.getScheduledExecutorService());
1✔
2018
        return;
1✔
2019
      }
2020
      // When terminating == true, no more real streams will be created. It's safe and also
2021
      // desirable to shutdown timely.
2022
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
2023
    }
1✔
2024

2025
    @Override
2026
    public void requestConnection() {
2027
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2028
      checkState(started, "not started");
1✔
2029
      subchannel.obtainActiveTransport();
1✔
2030
    }
1✔
2031

2032
    @Override
2033
    public List<EquivalentAddressGroup> getAllAddresses() {
2034
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2035
      checkState(started, "not started");
1✔
2036
      return addressGroups;
1✔
2037
    }
2038

2039
    @Override
2040
    public Attributes getAttributes() {
2041
      return args.getAttributes();
1✔
2042
    }
2043

2044
    @Override
2045
    public String toString() {
2046
      return subchannelLogId.toString();
1✔
2047
    }
2048

2049
    @Override
2050
    public Channel asChannel() {
2051
      checkState(started, "not started");
1✔
2052
      return new SubchannelChannel(
1✔
2053
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2054
          transportFactory.getScheduledExecutorService(),
1✔
2055
          callTracerFactory.create(),
1✔
2056
          new AtomicReference<InternalConfigSelector>(null));
2057
    }
2058

2059
    @Override
2060
    public Object getInternalSubchannel() {
2061
      checkState(started, "Subchannel is not started");
1✔
2062
      return subchannel;
1✔
2063
    }
2064

2065
    @Override
2066
    public ChannelLogger getChannelLogger() {
2067
      return subchannelLogger;
1✔
2068
    }
2069

2070
    @Override
2071
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2072
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2073
      addressGroups = addrs;
1✔
2074
      if (authorityOverride != null) {
1✔
2075
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2076
      }
2077
      subchannel.updateAddresses(addrs);
1✔
2078
    }
1✔
2079

2080
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2081
        List<EquivalentAddressGroup> eags) {
2082
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2083
      for (EquivalentAddressGroup eag : eags) {
1✔
2084
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2085
            eag.getAddresses(),
1✔
2086
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2087
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2088
      }
1✔
2089
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2090
    }
2091
  }
2092

2093
  @Override
2094
  public String toString() {
2095
    return MoreObjects.toStringHelper(this)
1✔
2096
        .add("logId", logId.getId())
1✔
2097
        .add("target", target)
1✔
2098
        .toString();
1✔
2099
  }
2100

2101
  /**
2102
   * Called from syncContext.
2103
   */
2104
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2105
    @Override
2106
    public void transportShutdown(Status s) {
2107
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2108
    }
1✔
2109

2110
    @Override
2111
    public void transportReady() {
2112
      // Don't care
2113
    }
×
2114

2115
    @Override
2116
    public Attributes filterTransport(Attributes attributes) {
2117
      return attributes;
×
2118
    }
2119

2120
    @Override
2121
    public void transportInUse(final boolean inUse) {
2122
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2123
    }
1✔
2124

2125
    @Override
2126
    public void transportTerminated() {
2127
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2128
      terminating = true;
1✔
2129
      shutdownNameResolverAndLoadBalancer(false);
1✔
2130
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2131
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2132
      // here.
2133
      maybeShutdownNowSubchannels();
1✔
2134
      maybeTerminateChannel();
1✔
2135
    }
1✔
2136
  }
2137

2138
  /**
2139
   * Must be accessed from syncContext.
2140
   */
2141
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2142
    @Override
2143
    protected void handleInUse() {
2144
      exitIdleMode();
1✔
2145
    }
1✔
2146

2147
    @Override
2148
    protected void handleNotInUse() {
2149
      if (shutdown.get()) {
1✔
2150
        return;
1✔
2151
      }
2152
      rescheduleIdleTimer();
1✔
2153
    }
1✔
2154
  }
2155

2156
  /**
2157
   * Lazily request for Executor from an executor pool.
2158
   * Also act as an Executor directly to simply run a cmd
2159
   */
2160
  @VisibleForTesting
2161
  static final class ExecutorHolder implements Executor {
2162
    private final ObjectPool<? extends Executor> pool;
2163
    private Executor executor;
2164

2165
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2166
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2167
    }
1✔
2168

2169
    synchronized Executor getExecutor() {
2170
      if (executor == null) {
1✔
2171
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2172
      }
2173
      return executor;
1✔
2174
    }
2175

2176
    synchronized void release() {
2177
      if (executor != null) {
1✔
2178
        executor = pool.returnObject(executor);
1✔
2179
      }
2180
    }
1✔
2181

2182
    @Override
2183
    public void execute(Runnable command) {
2184
      getExecutor().execute(command);
1✔
2185
    }
1✔
2186
  }
2187

2188
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2189
    final ScheduledExecutorService delegate;
2190

2191
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2192
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2193
    }
1✔
2194

2195
    @Override
2196
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2197
      return delegate.schedule(callable, delay, unit);
×
2198
    }
2199

2200
    @Override
2201
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2202
      return delegate.schedule(cmd, delay, unit);
1✔
2203
    }
2204

2205
    @Override
2206
    public ScheduledFuture<?> scheduleAtFixedRate(
2207
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2208
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
×
2209
    }
2210

2211
    @Override
2212
    public ScheduledFuture<?> scheduleWithFixedDelay(
2213
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2214
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2215
    }
2216

2217
    @Override
2218
    public boolean awaitTermination(long timeout, TimeUnit unit)
2219
        throws InterruptedException {
2220
      return delegate.awaitTermination(timeout, unit);
×
2221
    }
2222

2223
    @Override
2224
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2225
        throws InterruptedException {
2226
      return delegate.invokeAll(tasks);
×
2227
    }
2228

2229
    @Override
2230
    public <T> List<Future<T>> invokeAll(
2231
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2232
        throws InterruptedException {
2233
      return delegate.invokeAll(tasks, timeout, unit);
×
2234
    }
2235

2236
    @Override
2237
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2238
        throws InterruptedException, ExecutionException {
2239
      return delegate.invokeAny(tasks);
×
2240
    }
2241

2242
    @Override
2243
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2244
        throws InterruptedException, ExecutionException, TimeoutException {
2245
      return delegate.invokeAny(tasks, timeout, unit);
×
2246
    }
2247

2248
    @Override
2249
    public boolean isShutdown() {
2250
      return delegate.isShutdown();
×
2251
    }
2252

2253
    @Override
2254
    public boolean isTerminated() {
2255
      return delegate.isTerminated();
×
2256
    }
2257

2258
    @Override
2259
    public void shutdown() {
2260
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2261
    }
2262

2263
    @Override
2264
    public List<Runnable> shutdownNow() {
2265
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2266
    }
2267

2268
    @Override
2269
    public <T> Future<T> submit(Callable<T> task) {
2270
      return delegate.submit(task);
×
2271
    }
2272

2273
    @Override
2274
    public Future<?> submit(Runnable task) {
2275
      return delegate.submit(task);
×
2276
    }
2277

2278
    @Override
2279
    public <T> Future<T> submit(Runnable task, T result) {
2280
      return delegate.submit(task, result);
×
2281
    }
2282

2283
    @Override
2284
    public void execute(Runnable command) {
2285
      delegate.execute(command);
×
2286
    }
×
2287
  }
2288

2289
  /**
2290
   * A ResolutionState indicates the status of last name resolution.
2291
   */
2292
  enum ResolutionState {
1✔
2293
    NO_RESOLUTION,
1✔
2294
    SUCCESS,
1✔
2295
    ERROR
1✔
2296
  }
2297
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc