• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19122

27 Mar 2024 06:40PM UTC coverage: 88.257% (-0.01%) from 88.268%
#19122

push

github

web-flow
core: Transition to CONNECTING immediately when exiting idle

The name resolver takes some time before it returns addresses. While waiting the channel will be IDLE instead of the proper CONNECTING. This generally doesn't matter since RPCs behave similarly for IDLE and CONNECTING, but is confusing for users when watching channel.getState() closely.

Fixes #10517.

31206 of 35358 relevant lines covered (88.26%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.5
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import io.grpc.Attributes;
36
import io.grpc.CallCredentials;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ChannelCredentials;
40
import io.grpc.ChannelLogger;
41
import io.grpc.ChannelLogger.ChannelLogLevel;
42
import io.grpc.ClientCall;
43
import io.grpc.ClientInterceptor;
44
import io.grpc.ClientInterceptors;
45
import io.grpc.ClientStreamTracer;
46
import io.grpc.ClientTransportFilter;
47
import io.grpc.CompressorRegistry;
48
import io.grpc.ConnectivityState;
49
import io.grpc.ConnectivityStateInfo;
50
import io.grpc.Context;
51
import io.grpc.Deadline;
52
import io.grpc.DecompressorRegistry;
53
import io.grpc.EquivalentAddressGroup;
54
import io.grpc.ForwardingChannelBuilder2;
55
import io.grpc.ForwardingClientCall;
56
import io.grpc.Grpc;
57
import io.grpc.InternalChannelz;
58
import io.grpc.InternalChannelz.ChannelStats;
59
import io.grpc.InternalChannelz.ChannelTrace;
60
import io.grpc.InternalConfigSelector;
61
import io.grpc.InternalInstrumented;
62
import io.grpc.InternalLogId;
63
import io.grpc.InternalWithLogId;
64
import io.grpc.LoadBalancer;
65
import io.grpc.LoadBalancer.CreateSubchannelArgs;
66
import io.grpc.LoadBalancer.PickResult;
67
import io.grpc.LoadBalancer.PickSubchannelArgs;
68
import io.grpc.LoadBalancer.ResolvedAddresses;
69
import io.grpc.LoadBalancer.SubchannelPicker;
70
import io.grpc.LoadBalancer.SubchannelStateListener;
71
import io.grpc.ManagedChannel;
72
import io.grpc.ManagedChannelBuilder;
73
import io.grpc.Metadata;
74
import io.grpc.MethodDescriptor;
75
import io.grpc.NameResolver;
76
import io.grpc.NameResolver.ConfigOrError;
77
import io.grpc.NameResolver.ResolutionResult;
78
import io.grpc.NameResolverProvider;
79
import io.grpc.NameResolverRegistry;
80
import io.grpc.ProxyDetector;
81
import io.grpc.Status;
82
import io.grpc.SynchronizationContext;
83
import io.grpc.SynchronizationContext.ScheduledHandle;
84
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
85
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
86
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
87
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
88
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
89
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
90
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
91
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
92
import io.grpc.internal.RetriableStream.Throttle;
93
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
94
import java.net.SocketAddress;
95
import java.net.URI;
96
import java.net.URISyntaxException;
97
import java.util.ArrayList;
98
import java.util.Collection;
99
import java.util.Collections;
100
import java.util.HashSet;
101
import java.util.LinkedHashSet;
102
import java.util.List;
103
import java.util.Map;
104
import java.util.Set;
105
import java.util.concurrent.Callable;
106
import java.util.concurrent.CountDownLatch;
107
import java.util.concurrent.ExecutionException;
108
import java.util.concurrent.Executor;
109
import java.util.concurrent.Future;
110
import java.util.concurrent.ScheduledExecutorService;
111
import java.util.concurrent.ScheduledFuture;
112
import java.util.concurrent.TimeUnit;
113
import java.util.concurrent.TimeoutException;
114
import java.util.concurrent.atomic.AtomicBoolean;
115
import java.util.concurrent.atomic.AtomicReference;
116
import java.util.logging.Level;
117
import java.util.logging.Logger;
118
import java.util.regex.Pattern;
119
import javax.annotation.Nullable;
120
import javax.annotation.concurrent.GuardedBy;
121
import javax.annotation.concurrent.ThreadSafe;
122

123
/** A communication channel for making outgoing RPCs. */
124
@ThreadSafe
125
final class ManagedChannelImpl extends ManagedChannel implements
126
    InternalInstrumented<ChannelStats> {
127
  @VisibleForTesting
128
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
129

130
  // Matching this pattern means the target string is a URI target or at least intended to be one.
131
  // A URI target must be an absolute hierarchical URI.
132
  // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
133
  @VisibleForTesting
134
  static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
1✔
135

136
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
137

138
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
139

140
  @VisibleForTesting
141
  static final Status SHUTDOWN_NOW_STATUS =
1✔
142
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
143

144
  @VisibleForTesting
145
  static final Status SHUTDOWN_STATUS =
1✔
146
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
147

148
  @VisibleForTesting
149
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
150
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
151

152
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
153
      ManagedChannelServiceConfig.empty();
1✔
154
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
155
      new InternalConfigSelector() {
1✔
156
        @Override
157
        public Result selectConfig(PickSubchannelArgs args) {
158
          throw new IllegalStateException("Resolution is pending");
×
159
        }
160
      };
161

162
  private final InternalLogId logId;
163
  private final String target;
164
  @Nullable
165
  private final String authorityOverride;
166
  private final NameResolverRegistry nameResolverRegistry;
167
  private final NameResolver.Args nameResolverArgs;
168
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
169
  private final ClientTransportFactory originalTransportFactory;
170
  @Nullable
171
  private final ChannelCredentials originalChannelCreds;
172
  private final ClientTransportFactory transportFactory;
173
  private final ClientTransportFactory oobTransportFactory;
174
  private final RestrictedScheduledExecutor scheduledExecutor;
175
  private final Executor executor;
176
  private final ObjectPool<? extends Executor> executorPool;
177
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
178
  private final ExecutorHolder balancerRpcExecutorHolder;
179
  private final ExecutorHolder offloadExecutorHolder;
180
  private final TimeProvider timeProvider;
181
  private final int maxTraceEvents;
182

183
  @VisibleForTesting
1✔
184
  final SynchronizationContext syncContext = new SynchronizationContext(
185
      new Thread.UncaughtExceptionHandler() {
1✔
186
        @Override
187
        public void uncaughtException(Thread t, Throwable e) {
188
          logger.log(
1✔
189
              Level.SEVERE,
190
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
191
              e);
192
          panic(e);
1✔
193
        }
1✔
194
      });
195

196
  private boolean fullStreamDecompression;
197

198
  private final DecompressorRegistry decompressorRegistry;
199
  private final CompressorRegistry compressorRegistry;
200

201
  private final Supplier<Stopwatch> stopwatchSupplier;
202
  /** The timout before entering idle mode. */
203
  private final long idleTimeoutMillis;
204

205
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
206
  private final BackoffPolicy.Provider backoffPolicyProvider;
207

208
  /**
209
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
210
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
211
   * {@link RealChannel}.
212
   */
213
  private final Channel interceptorChannel;
214

215
  private final List<ClientTransportFilter> transportFilters;
216
  @Nullable private final String userAgent;
217

218
  // Only null after channel is terminated. Must be assigned from the syncContext.
219
  private NameResolver nameResolver;
220

221
  // Must be accessed from the syncContext.
222
  private boolean nameResolverStarted;
223

224
  // null when channel is in idle mode.  Must be assigned from syncContext.
225
  @Nullable
226
  private LbHelperImpl lbHelper;
227

228
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
229
  // null if channel is in idle mode.
230
  @Nullable
231
  private volatile SubchannelPicker subchannelPicker;
232

233
  // Must be accessed from the syncContext
234
  private boolean panicMode;
235

236
  // Must be mutated from syncContext
237
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
238
  // switch to a ConcurrentHashMap.
239
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
240

241
  // Must be accessed from syncContext
242
  @Nullable
243
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
244
  private final Object pendingCallsInUseObject = new Object();
1✔
245

246
  // Must be mutated from syncContext
247
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
248

249
  // reprocess() must be run from syncContext
250
  private final DelayedClientTransport delayedTransport;
251
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
252
      = new UncommittedRetriableStreamsRegistry();
253

254
  // Shutdown states.
255
  //
256
  // Channel's shutdown process:
257
  // 1. shutdown(): stop accepting new calls from applications
258
  //   1a shutdown <- true
259
  //   1b subchannelPicker <- null
260
  //   1c delayedTransport.shutdown()
261
  // 2. delayedTransport terminated: stop stream-creation functionality
262
  //   2a terminating <- true
263
  //   2b loadBalancer.shutdown()
264
  //     * LoadBalancer will shutdown subchannels and OOB channels
265
  //   2c loadBalancer <- null
266
  //   2d nameResolver.shutdown()
267
  //   2e nameResolver <- null
268
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
269

270
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
271
  // Must only be mutated and read from syncContext
272
  private boolean shutdownNowed;
273
  // Must only be mutated from syncContext
274
  private boolean terminating;
275
  // Must be mutated from syncContext
276
  private volatile boolean terminated;
277
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
278

279
  private final CallTracer.Factory callTracerFactory;
280
  private final CallTracer channelCallTracer;
281
  private final ChannelTracer channelTracer;
282
  private final ChannelLogger channelLogger;
283
  private final InternalChannelz channelz;
284
  private final RealChannel realChannel;
285
  // Must be mutated and read from syncContext
286
  // a flag for doing channel tracing when flipped
287
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
288
  // Must be mutated and read from constructor or syncContext
289
  // used for channel tracing when value changed
290
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
291

292
  @Nullable
293
  private final ManagedChannelServiceConfig defaultServiceConfig;
294
  // Must be mutated and read from constructor or syncContext
295
  private boolean serviceConfigUpdated = false;
1✔
296
  private final boolean lookUpServiceConfig;
297

298
  // One instance per channel.
299
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
300

301
  private final long perRpcBufferLimit;
302
  private final long channelBufferLimit;
303

304
  // Temporary false flag that can skip the retry code path.
305
  private final boolean retryEnabled;
306

307
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
308

309
  // Called from syncContext
310
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
311
      new DelayedTransportListener();
312

313
  // Must be called from syncContext
314
  private void maybeShutdownNowSubchannels() {
315
    if (shutdownNowed) {
1✔
316
      for (InternalSubchannel subchannel : subchannels) {
1✔
317
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
318
      }
1✔
319
      for (OobChannel oobChannel : oobChannels) {
1✔
320
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
321
      }
1✔
322
    }
323
  }
1✔
324

325
  // Must be accessed from syncContext
326
  @VisibleForTesting
1✔
327
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
328

329
  @Override
330
  public ListenableFuture<ChannelStats> getStats() {
331
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
332
    final class StatsFetcher implements Runnable {
1✔
333
      @Override
334
      public void run() {
335
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
336
        channelCallTracer.updateBuilder(builder);
1✔
337
        channelTracer.updateBuilder(builder);
1✔
338
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
339
        List<InternalWithLogId> children = new ArrayList<>();
1✔
340
        children.addAll(subchannels);
1✔
341
        children.addAll(oobChannels);
1✔
342
        builder.setSubchannels(children);
1✔
343
        ret.set(builder.build());
1✔
344
      }
1✔
345
    }
346

347
    // subchannels and oobchannels can only be accessed from syncContext
348
    syncContext.execute(new StatsFetcher());
1✔
349
    return ret;
1✔
350
  }
351

352
  @Override
353
  public InternalLogId getLogId() {
354
    return logId;
1✔
355
  }
356

357
  // Run from syncContext
358
  private class IdleModeTimer implements Runnable {
1✔
359

360
    @Override
361
    public void run() {
362
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
363
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
364
      // subtle to change rapidly to resolve the channel panic. See #8714
365
      if (lbHelper == null) {
1✔
366
        return;
×
367
      }
368
      enterIdleMode();
1✔
369
    }
1✔
370
  }
371

372
  // Must be called from syncContext
373
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
374
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
375
    if (channelIsActive) {
1✔
376
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
377
      checkState(lbHelper != null, "lbHelper is null");
1✔
378
    }
379
    if (nameResolver != null) {
1✔
380
      nameResolver.shutdown();
1✔
381
      nameResolverStarted = false;
1✔
382
      if (channelIsActive) {
1✔
383
        nameResolver = getNameResolver(
1✔
384
            target, authorityOverride, nameResolverRegistry, nameResolverArgs,
385
            transportFactory.getSupportedSocketAddressTypes());
1✔
386
      } else {
387
        nameResolver = null;
1✔
388
      }
389
    }
390
    if (lbHelper != null) {
1✔
391
      lbHelper.lb.shutdown();
1✔
392
      lbHelper = null;
1✔
393
    }
394
    subchannelPicker = null;
1✔
395
  }
1✔
396

397
  /**
398
   * Make the channel exit idle mode, if it's in it.
399
   *
400
   * <p>Must be called from syncContext
401
   */
402
  @VisibleForTesting
403
  void exitIdleMode() {
404
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
405
    if (shutdown.get() || panicMode) {
1✔
406
      return;
1✔
407
    }
408
    if (inUseStateAggregator.isInUse()) {
1✔
409
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
410
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
411
      cancelIdleTimer(false);
1✔
412
    } else {
413
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
414
      // isInUse() == false, in which case we still need to schedule the timer.
415
      rescheduleIdleTimer();
1✔
416
    }
417
    if (lbHelper != null) {
1✔
418
      return;
1✔
419
    }
420
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
421
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
422
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
423
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
424
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
425
    this.lbHelper = lbHelper;
1✔
426

427
    channelStateManager.gotoState(CONNECTING);
1✔
428
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
429
    nameResolver.start(listener);
1✔
430
    nameResolverStarted = true;
1✔
431
  }
1✔
432

433
  // Must be run from syncContext
434
  private void enterIdleMode() {
435
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
436
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
437
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
438
    // which are bugs.
439
    shutdownNameResolverAndLoadBalancer(true);
1✔
440
    delayedTransport.reprocess(null);
1✔
441
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
442
    channelStateManager.gotoState(IDLE);
1✔
443
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
444
    // transport to be holding some we need to exit idle mode to give these calls a chance to
445
    // be processed.
446
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
447
      exitIdleMode();
1✔
448
    }
449
  }
1✔
450

451
  // Must be run from syncContext
452
  private void cancelIdleTimer(boolean permanent) {
453
    idleTimer.cancel(permanent);
1✔
454
  }
1✔
455

456
  // Always run from syncContext
457
  private void rescheduleIdleTimer() {
458
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
459
      return;
1✔
460
    }
461
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
462
  }
1✔
463

464
  /**
465
   * Force name resolution refresh to happen immediately. Must be run
466
   * from syncContext.
467
   */
468
  private void refreshNameResolution() {
469
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
470
    if (nameResolverStarted) {
1✔
471
      nameResolver.refresh();
1✔
472
    }
473
  }
1✔
474

475
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
476
    volatile Throttle throttle;
477

478
    private ClientTransport getTransport(PickSubchannelArgs args) {
479
      SubchannelPicker pickerCopy = subchannelPicker;
1✔
480
      if (shutdown.get()) {
1✔
481
        // If channel is shut down, delayedTransport is also shut down which will fail the stream
482
        // properly.
483
        return delayedTransport;
1✔
484
      }
485
      if (pickerCopy == null) {
1✔
486
        final class ExitIdleModeForTransport implements Runnable {
1✔
487
          @Override
488
          public void run() {
489
            exitIdleMode();
1✔
490
          }
1✔
491
        }
492

493
        syncContext.execute(new ExitIdleModeForTransport());
1✔
494
        return delayedTransport;
1✔
495
      }
496
      // There is no need to reschedule the idle timer here.
497
      //
498
      // pickerCopy != null, which means idle timer has not expired when this method starts.
499
      // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
500
      // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
501
      // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
502
      //
503
      // In most cases the idle timer is scheduled to fire after the transport has created the
504
      // stream, which would have reported in-use state to the channel that would have cancelled
505
      // the idle timer.
506
      PickResult pickResult = pickerCopy.pickSubchannel(args);
1✔
507
      ClientTransport transport = GrpcUtil.getTransportFromPickResult(
1✔
508
          pickResult, args.getCallOptions().isWaitForReady());
1✔
509
      if (transport != null) {
1✔
510
        return transport;
1✔
511
      }
512
      return delayedTransport;
1✔
513
    }
514

515
    @Override
516
    public ClientStream newStream(
517
        final MethodDescriptor<?, ?> method,
518
        final CallOptions callOptions,
519
        final Metadata headers,
520
        final Context context) {
521
      if (!retryEnabled) {
1✔
522
        ClientTransport transport =
1✔
523
            getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
1✔
524
        Context origContext = context.attach();
1✔
525
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
526
            callOptions, headers, 0, /* isTransparentRetry= */ false);
527
        try {
528
          return transport.newStream(method, headers, callOptions, tracers);
1✔
529
        } finally {
530
          context.detach(origContext);
1✔
531
        }
532
      } else {
533
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
534
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
535
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
536
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
537
          @SuppressWarnings("unchecked")
538
          RetryStream() {
1✔
539
            super(
1✔
540
                (MethodDescriptor<ReqT, ?>) method,
541
                headers,
542
                channelBufferUsed,
1✔
543
                perRpcBufferLimit,
1✔
544
                channelBufferLimit,
1✔
545
                getCallExecutor(callOptions),
1✔
546
                transportFactory.getScheduledExecutorService(),
1✔
547
                retryPolicy,
548
                hedgingPolicy,
549
                throttle);
550
          }
1✔
551

552
          @Override
553
          Status prestart() {
554
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
555
          }
556

557
          @Override
558
          void postCommit() {
559
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
560
          }
1✔
561

562
          @Override
563
          ClientStream newSubstream(
564
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
565
              boolean isTransparentRetry) {
566
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
567
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
568
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
569
            ClientTransport transport =
1✔
570
                getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
1✔
571
            Context origContext = context.attach();
1✔
572
            try {
573
              return transport.newStream(method, newHeaders, newOptions, tracers);
1✔
574
            } finally {
575
              context.detach(origContext);
1✔
576
            }
577
          }
578
        }
579

580
        return new RetryStream<>();
1✔
581
      }
582
    }
583
  }
584

585
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
586

587
  private final Rescheduler idleTimer;
588

589
  ManagedChannelImpl(
590
      ManagedChannelImplBuilder builder,
591
      ClientTransportFactory clientTransportFactory,
592
      BackoffPolicy.Provider backoffPolicyProvider,
593
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
594
      Supplier<Stopwatch> stopwatchSupplier,
595
      List<ClientInterceptor> interceptors,
596
      final TimeProvider timeProvider) {
1✔
597
    this.target = checkNotNull(builder.target, "target");
1✔
598
    this.logId = InternalLogId.allocate("Channel", target);
1✔
599
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
600
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
601
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
602
    this.originalChannelCreds = builder.channelCredentials;
1✔
603
    this.originalTransportFactory = clientTransportFactory;
1✔
604
    this.offloadExecutorHolder =
1✔
605
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
606
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
607
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
608
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
609
        clientTransportFactory, null, this.offloadExecutorHolder);
610
    this.scheduledExecutor =
1✔
611
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
612
    maxTraceEvents = builder.maxTraceEvents;
1✔
613
    channelTracer = new ChannelTracer(
1✔
614
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
615
        "Channel for '" + target + "'");
616
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
617
    ProxyDetector proxyDetector =
618
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
619
    this.retryEnabled = builder.retryEnabled;
1✔
620
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
621
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
622
    ScParser serviceConfigParser =
1✔
623
        new ScParser(
624
            retryEnabled,
625
            builder.maxRetryAttempts,
626
            builder.maxHedgedAttempts,
627
            loadBalancerFactory);
628
    this.authorityOverride = builder.authorityOverride;
1✔
629
    this.nameResolverArgs =
1✔
630
        NameResolver.Args.newBuilder()
1✔
631
            .setDefaultPort(builder.getDefaultPort())
1✔
632
            .setProxyDetector(proxyDetector)
1✔
633
            .setSynchronizationContext(syncContext)
1✔
634
            .setScheduledExecutorService(scheduledExecutor)
1✔
635
            .setServiceConfigParser(serviceConfigParser)
1✔
636
            .setChannelLogger(channelLogger)
1✔
637
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
638
            .setOverrideAuthority(this.authorityOverride)
1✔
639
            .build();
1✔
640
    this.nameResolver = getNameResolver(
1✔
641
        target, authorityOverride, nameResolverRegistry, nameResolverArgs,
642
        transportFactory.getSupportedSocketAddressTypes());
1✔
643
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
644
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
645
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
646
    this.delayedTransport.start(delayedTransportListener);
1✔
647
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
648

649
    if (builder.defaultServiceConfig != null) {
1✔
650
      ConfigOrError parsedDefaultServiceConfig =
1✔
651
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
652
      checkState(
1✔
653
          parsedDefaultServiceConfig.getError() == null,
1✔
654
          "Default config is invalid: %s",
655
          parsedDefaultServiceConfig.getError());
1✔
656
      this.defaultServiceConfig =
1✔
657
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
658
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
659
    } else {
1✔
660
      this.defaultServiceConfig = null;
1✔
661
    }
662
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
663
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
664
    Channel channel = realChannel;
1✔
665
    if (builder.binlog != null) {
1✔
666
      channel = builder.binlog.wrapChannel(channel);
1✔
667
    }
668
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
669
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
670
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
671
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
672
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
673
    } else {
674
      checkArgument(
1✔
675
          builder.idleTimeoutMillis
676
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
677
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
678
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
679
    }
680

681
    idleTimer = new Rescheduler(
1✔
682
        new IdleModeTimer(),
683
        syncContext,
684
        transportFactory.getScheduledExecutorService(),
1✔
685
        stopwatchSupplier.get());
1✔
686
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
687
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
688
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
689
    this.userAgent = builder.userAgent;
1✔
690

691
    this.channelBufferLimit = builder.retryBufferSize;
1✔
692
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
693
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
694
      @Override
695
      public CallTracer create() {
696
        return new CallTracer(timeProvider);
1✔
697
      }
698
    }
699

700
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
701
    channelCallTracer = callTracerFactory.create();
1✔
702
    this.channelz = checkNotNull(builder.channelz);
1✔
703
    channelz.addRootChannel(this);
1✔
704

705
    if (!lookUpServiceConfig) {
1✔
706
      if (defaultServiceConfig != null) {
1✔
707
        channelLogger.log(
1✔
708
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
709
      }
710
      serviceConfigUpdated = true;
1✔
711
    }
712
  }
1✔
713

714
  private static NameResolver getNameResolver(
715
      String target, NameResolverRegistry nameResolverRegistry, NameResolver.Args nameResolverArgs,
716
      Collection<Class<? extends SocketAddress>> channelTransportSocketAddressTypes) {
717
    // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
718
    // "dns:///".
719
    NameResolverProvider provider = null;
1✔
720
    URI targetUri = null;
1✔
721
    StringBuilder uriSyntaxErrors = new StringBuilder();
1✔
722
    try {
723
      targetUri = new URI(target);
1✔
724
    } catch (URISyntaxException e) {
1✔
725
      // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
726
      uriSyntaxErrors.append(e.getMessage());
1✔
727
    }
1✔
728
    if (targetUri != null) {
1✔
729
      // For "localhost:8080" this would likely cause provider to be null, because "localhost" is
730
      // parsed as the scheme. Will hit the next case and try "dns:///localhost:8080".
731
      provider = nameResolverRegistry.getProviderForScheme(targetUri.getScheme());
1✔
732
    }
733

734
    if (provider == null && !URI_PATTERN.matcher(target).matches()) {
1✔
735
      // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
736
      // scheme from the registry.
737
      try {
738
        targetUri = new URI(nameResolverRegistry.getDefaultScheme(), "", "/" + target, null);
1✔
739
      } catch (URISyntaxException e) {
×
740
        // Should not be possible.
741
        throw new IllegalArgumentException(e);
×
742
      }
1✔
743
      provider = nameResolverRegistry.getProviderForScheme(targetUri.getScheme());
1✔
744
    }
745

746
    if (provider == null) {
1✔
747
      throw new IllegalArgumentException(String.format(
1✔
748
          "Could not find a NameResolverProvider for %s%s",
749
          target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
1✔
750
    }
751

752
    if (channelTransportSocketAddressTypes != null) {
1✔
753
      Collection<Class<? extends SocketAddress>> nameResolverSocketAddressTypes
1✔
754
          = provider.getProducedSocketAddressTypes();
1✔
755
      if (!channelTransportSocketAddressTypes.containsAll(nameResolverSocketAddressTypes)) {
1✔
756
        throw new IllegalArgumentException(String.format(
1✔
757
            "Address types of NameResolver '%s' for '%s' not supported by transport",
758
            targetUri.getScheme(), target));
1✔
759
      }
760
    }
761

762
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
763
    if (resolver != null) {
1✔
764
      return resolver;
1✔
765
    }
766

767
    throw new IllegalArgumentException(String.format(
1✔
768
        "cannot create a NameResolver for %s%s",
769
        target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
1✔
770
  }
771

772
  @VisibleForTesting
773
  static NameResolver getNameResolver(
774
      String target, @Nullable final String overrideAuthority,
775
      NameResolverRegistry nameResolverRegistry, NameResolver.Args nameResolverArgs,
776
      Collection<Class<? extends SocketAddress>> channelTransportSocketAddressTypes) {
777
    NameResolver resolver = getNameResolver(target, nameResolverRegistry, nameResolverArgs,
1✔
778
        channelTransportSocketAddressTypes);
779

780
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
781
    // TODO: After a transition period, all NameResolver implementations that need retry should use
782
    //       RetryingNameResolver directly and this step can be removed.
783
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
784
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
785
              nameResolverArgs.getScheduledExecutorService(),
1✔
786
              nameResolverArgs.getSynchronizationContext()),
1✔
787
          nameResolverArgs.getSynchronizationContext());
1✔
788

789
    if (overrideAuthority == null) {
1✔
790
      return usedNameResolver;
1✔
791
    }
792

793
    return new ForwardingNameResolver(usedNameResolver) {
1✔
794
      @Override
795
      public String getServiceAuthority() {
796
        return overrideAuthority;
1✔
797
      }
798
    };
799
  }
800

801
  @VisibleForTesting
802
  InternalConfigSelector getConfigSelector() {
803
    return realChannel.configSelector.get();
1✔
804
  }
805

806
  /**
807
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
808
   * cancelled.
809
   */
810
  @Override
811
  public ManagedChannelImpl shutdown() {
812
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
813
    if (!shutdown.compareAndSet(false, true)) {
1✔
814
      return this;
1✔
815
    }
816
    final class Shutdown implements Runnable {
1✔
817
      @Override
818
      public void run() {
819
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
820
        channelStateManager.gotoState(SHUTDOWN);
1✔
821
      }
1✔
822
    }
823

824
    syncContext.execute(new Shutdown());
1✔
825
    realChannel.shutdown();
1✔
826
    final class CancelIdleTimer implements Runnable {
1✔
827
      @Override
828
      public void run() {
829
        cancelIdleTimer(/* permanent= */ true);
1✔
830
      }
1✔
831
    }
832

833
    syncContext.execute(new CancelIdleTimer());
1✔
834
    return this;
1✔
835
  }
836

837
  /**
838
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
839
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
840
   * return {@code false} immediately after this method returns.
841
   */
842
  @Override
843
  public ManagedChannelImpl shutdownNow() {
844
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
845
    shutdown();
1✔
846
    realChannel.shutdownNow();
1✔
847
    final class ShutdownNow implements Runnable {
1✔
848
      @Override
849
      public void run() {
850
        if (shutdownNowed) {
1✔
851
          return;
1✔
852
        }
853
        shutdownNowed = true;
1✔
854
        maybeShutdownNowSubchannels();
1✔
855
      }
1✔
856
    }
857

858
    syncContext.execute(new ShutdownNow());
1✔
859
    return this;
1✔
860
  }
861

862
  // Called from syncContext
863
  @VisibleForTesting
864
  void panic(final Throwable t) {
865
    if (panicMode) {
1✔
866
      // Preserve the first panic information
867
      return;
×
868
    }
869
    panicMode = true;
1✔
870
    cancelIdleTimer(/* permanent= */ true);
1✔
871
    shutdownNameResolverAndLoadBalancer(false);
1✔
872
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
873
      private final PickResult panicPickResult =
1✔
874
          PickResult.withDrop(
1✔
875
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
876

877
      @Override
878
      public PickResult pickSubchannel(PickSubchannelArgs args) {
879
        return panicPickResult;
1✔
880
      }
881

882
      @Override
883
      public String toString() {
884
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
885
            .add("panicPickResult", panicPickResult)
×
886
            .toString();
×
887
      }
888
    }
889

890
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
891
    realChannel.updateConfigSelector(null);
1✔
892
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
893
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
894
  }
1✔
895

896
  @VisibleForTesting
897
  boolean isInPanicMode() {
898
    return panicMode;
1✔
899
  }
900

901
  // Called from syncContext
902
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
903
    subchannelPicker = newPicker;
1✔
904
    delayedTransport.reprocess(newPicker);
1✔
905
  }
1✔
906

907
  @Override
908
  public boolean isShutdown() {
909
    return shutdown.get();
1✔
910
  }
911

912
  @Override
913
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
914
    return terminatedLatch.await(timeout, unit);
1✔
915
  }
916

917
  @Override
918
  public boolean isTerminated() {
919
    return terminated;
1✔
920
  }
921

922
  /*
923
   * Creates a new outgoing call on the channel.
924
   */
925
  @Override
926
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
927
      CallOptions callOptions) {
928
    return interceptorChannel.newCall(method, callOptions);
1✔
929
  }
930

931
  @Override
932
  public String authority() {
933
    return interceptorChannel.authority();
1✔
934
  }
935

936
  private Executor getCallExecutor(CallOptions callOptions) {
937
    Executor executor = callOptions.getExecutor();
1✔
938
    if (executor == null) {
1✔
939
      executor = this.executor;
1✔
940
    }
941
    return executor;
1✔
942
  }
943

944
  private class RealChannel extends Channel {
945
    // Reference to null if no config selector is available from resolution result
946
    // Reference must be set() from syncContext
947
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
948
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
949
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
950
    // same target, the new instance must have the same value.
951
    private final String authority;
952

953
    private final Channel clientCallImplChannel = new Channel() {
1✔
954
      @Override
955
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
956
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
957
        return new ClientCallImpl<>(
1✔
958
            method,
959
            getCallExecutor(callOptions),
1✔
960
            callOptions,
961
            transportProvider,
1✔
962
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
963
            channelCallTracer,
1✔
964
            null)
965
            .setFullStreamDecompression(fullStreamDecompression)
1✔
966
            .setDecompressorRegistry(decompressorRegistry)
1✔
967
            .setCompressorRegistry(compressorRegistry);
1✔
968
      }
969

970
      @Override
971
      public String authority() {
972
        return authority;
×
973
      }
974
    };
975

976
    private RealChannel(String authority) {
1✔
977
      this.authority =  checkNotNull(authority, "authority");
1✔
978
    }
1✔
979

980
    @Override
981
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
982
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
983
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
984
        return newClientCall(method, callOptions);
1✔
985
      }
986
      syncContext.execute(new Runnable() {
1✔
987
        @Override
988
        public void run() {
989
          exitIdleMode();
1✔
990
        }
1✔
991
      });
992
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
993
        // This is an optimization for the case (typically with InProcessTransport) when name
994
        // resolution result is immediately available at this point. Otherwise, some users'
995
        // tests might observe slight behavior difference from earlier grpc versions.
996
        return newClientCall(method, callOptions);
1✔
997
      }
998
      if (shutdown.get()) {
1✔
999
        // Return a failing ClientCall.
1000
        return new ClientCall<ReqT, RespT>() {
×
1001
          @Override
1002
          public void start(Listener<RespT> responseListener, Metadata headers) {
1003
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
1004
          }
×
1005

1006
          @Override public void request(int numMessages) {}
×
1007

1008
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
1009

1010
          @Override public void halfClose() {}
×
1011

1012
          @Override public void sendMessage(ReqT message) {}
×
1013
        };
1014
      }
1015
      Context context = Context.current();
1✔
1016
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
1017
      syncContext.execute(new Runnable() {
1✔
1018
        @Override
1019
        public void run() {
1020
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1021
            if (pendingCalls == null) {
1✔
1022
              pendingCalls = new LinkedHashSet<>();
1✔
1023
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
1024
            }
1025
            pendingCalls.add(pendingCall);
1✔
1026
          } else {
1027
            pendingCall.reprocess();
1✔
1028
          }
1029
        }
1✔
1030
      });
1031
      return pendingCall;
1✔
1032
    }
1033

1034
    // Must run in SynchronizationContext.
1035
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
1036
      InternalConfigSelector prevConfig = configSelector.get();
1✔
1037
      configSelector.set(config);
1✔
1038
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
1039
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1040
          pendingCall.reprocess();
1✔
1041
        }
1✔
1042
      }
1043
    }
1✔
1044

1045
    // Must run in SynchronizationContext.
1046
    void onConfigError() {
1047
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1048
        updateConfigSelector(null);
1✔
1049
      }
1050
    }
1✔
1051

1052
    void shutdown() {
1053
      final class RealChannelShutdown implements Runnable {
1✔
1054
        @Override
1055
        public void run() {
1056
          if (pendingCalls == null) {
1✔
1057
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1058
              configSelector.set(null);
1✔
1059
            }
1060
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1061
          }
1062
        }
1✔
1063
      }
1064

1065
      syncContext.execute(new RealChannelShutdown());
1✔
1066
    }
1✔
1067

1068
    void shutdownNow() {
1069
      final class RealChannelShutdownNow implements Runnable {
1✔
1070
        @Override
1071
        public void run() {
1072
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1073
            configSelector.set(null);
1✔
1074
          }
1075
          if (pendingCalls != null) {
1✔
1076
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1077
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
1078
            }
1✔
1079
          }
1080
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
1081
        }
1✔
1082
      }
1083

1084
      syncContext.execute(new RealChannelShutdownNow());
1✔
1085
    }
1✔
1086

1087
    @Override
1088
    public String authority() {
1089
      return authority;
1✔
1090
    }
1091

1092
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1093
      final Context context;
1094
      final MethodDescriptor<ReqT, RespT> method;
1095
      final CallOptions callOptions;
1096
      private final long callCreationTime;
1097

1098
      PendingCall(
1099
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1100
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1101
        this.context = context;
1✔
1102
        this.method = method;
1✔
1103
        this.callOptions = callOptions;
1✔
1104
        this.callCreationTime = ticker.nanoTime();
1✔
1105
      }
1✔
1106

1107
      /** Called when it's ready to create a real call and reprocess the pending call. */
1108
      void reprocess() {
1109
        ClientCall<ReqT, RespT> realCall;
1110
        Context previous = context.attach();
1✔
1111
        try {
1112
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1113
              ticker.nanoTime() - callCreationTime);
1✔
1114
          realCall = newClientCall(method, delayResolutionOption);
1✔
1115
        } finally {
1116
          context.detach(previous);
1✔
1117
        }
1118
        Runnable toRun = setCall(realCall);
1✔
1119
        if (toRun == null) {
1✔
1120
          syncContext.execute(new PendingCallRemoval());
1✔
1121
        } else {
1122
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1123
            @Override
1124
            public void run() {
1125
              toRun.run();
1✔
1126
              syncContext.execute(new PendingCallRemoval());
1✔
1127
            }
1✔
1128
          });
1129
        }
1130
      }
1✔
1131

1132
      @Override
1133
      protected void callCancelled() {
1134
        super.callCancelled();
1✔
1135
        syncContext.execute(new PendingCallRemoval());
1✔
1136
      }
1✔
1137

1138
      final class PendingCallRemoval implements Runnable {
1✔
1139
        @Override
1140
        public void run() {
1141
          if (pendingCalls != null) {
1✔
1142
            pendingCalls.remove(PendingCall.this);
1✔
1143
            if (pendingCalls.isEmpty()) {
1✔
1144
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1145
              pendingCalls = null;
1✔
1146
              if (shutdown.get()) {
1✔
1147
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1148
              }
1149
            }
1150
          }
1151
        }
1✔
1152
      }
1153
    }
1154

1155
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1156
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1157
      InternalConfigSelector selector = configSelector.get();
1✔
1158
      if (selector == null) {
1✔
1159
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1160
      }
1161
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1162
        MethodInfo methodInfo =
1✔
1163
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1164
        if (methodInfo != null) {
1✔
1165
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1166
        }
1167
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1168
      }
1169
      return new ConfigSelectingClientCall<>(
1✔
1170
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1171
    }
1172
  }
1173

1174
  /**
1175
   * A client call for a given channel that applies a given config selector when it starts.
1176
   */
1177
  static final class ConfigSelectingClientCall<ReqT, RespT>
1178
      extends ForwardingClientCall<ReqT, RespT> {
1179

1180
    private final InternalConfigSelector configSelector;
1181
    private final Channel channel;
1182
    private final Executor callExecutor;
1183
    private final MethodDescriptor<ReqT, RespT> method;
1184
    private final Context context;
1185
    private CallOptions callOptions;
1186

1187
    private ClientCall<ReqT, RespT> delegate;
1188

1189
    ConfigSelectingClientCall(
1190
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1191
        MethodDescriptor<ReqT, RespT> method,
1192
        CallOptions callOptions) {
1✔
1193
      this.configSelector = configSelector;
1✔
1194
      this.channel = channel;
1✔
1195
      this.method = method;
1✔
1196
      this.callExecutor =
1✔
1197
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1198
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1199
      this.context = Context.current();
1✔
1200
    }
1✔
1201

1202
    @Override
1203
    protected ClientCall<ReqT, RespT> delegate() {
1204
      return delegate;
1✔
1205
    }
1206

1207
    @SuppressWarnings("unchecked")
1208
    @Override
1209
    public void start(Listener<RespT> observer, Metadata headers) {
1210
      PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
1✔
1211
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1212
      Status status = result.getStatus();
1✔
1213
      if (!status.isOk()) {
1✔
1214
        executeCloseObserverInContext(observer,
1✔
1215
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1216
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1217
        return;
1✔
1218
      }
1219
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1220
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1221
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1222
      if (methodInfo != null) {
1✔
1223
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1224
      }
1225
      if (interceptor != null) {
1✔
1226
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1227
      } else {
1228
        delegate = channel.newCall(method, callOptions);
×
1229
      }
1230
      delegate.start(observer, headers);
1✔
1231
    }
1✔
1232

1233
    private void executeCloseObserverInContext(
1234
        final Listener<RespT> observer, final Status status) {
1235
      class CloseInContext extends ContextRunnable {
1236
        CloseInContext() {
1✔
1237
          super(context);
1✔
1238
        }
1✔
1239

1240
        @Override
1241
        public void runInContext() {
1242
          observer.onClose(status, new Metadata());
1✔
1243
        }
1✔
1244
      }
1245

1246
      callExecutor.execute(new CloseInContext());
1✔
1247
    }
1✔
1248

1249
    @Override
1250
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1251
      if (delegate != null) {
×
1252
        delegate.cancel(message, cause);
×
1253
      }
1254
    }
×
1255
  }
1256

1257
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1258
    @Override
1259
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1260

1261
    @Override
1262
    public void request(int numMessages) {}
1✔
1263

1264
    @Override
1265
    public void cancel(String message, Throwable cause) {}
×
1266

1267
    @Override
1268
    public void halfClose() {}
×
1269

1270
    @Override
1271
    public void sendMessage(Object message) {}
×
1272

1273
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1274
    @Override
1275
    public boolean isReady() {
1276
      return false;
×
1277
    }
1278
  };
1279

1280
  /**
1281
   * Terminate the channel if termination conditions are met.
1282
   */
1283
  // Must be run from syncContext
1284
  private void maybeTerminateChannel() {
1285
    if (terminated) {
1✔
1286
      return;
×
1287
    }
1288
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1289
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1290
      channelz.removeRootChannel(this);
1✔
1291
      executorPool.returnObject(executor);
1✔
1292
      balancerRpcExecutorHolder.release();
1✔
1293
      offloadExecutorHolder.release();
1✔
1294
      // Release the transport factory so that it can deallocate any resources.
1295
      transportFactory.close();
1✔
1296

1297
      terminated = true;
1✔
1298
      terminatedLatch.countDown();
1✔
1299
    }
1300
  }
1✔
1301

1302
  // Must be called from syncContext
1303
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1304
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1305
      refreshNameResolution();
1✔
1306
    }
1307
  }
1✔
1308

1309
  @Override
1310
  @SuppressWarnings("deprecation")
1311
  public ConnectivityState getState(boolean requestConnection) {
1312
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1313
    if (requestConnection && savedChannelState == IDLE) {
1✔
1314
      final class RequestConnection implements Runnable {
1✔
1315
        @Override
1316
        public void run() {
1317
          exitIdleMode();
1✔
1318
          if (subchannelPicker != null) {
1✔
1319
            subchannelPicker.requestConnection();
1✔
1320
          }
1321
          if (lbHelper != null) {
1✔
1322
            lbHelper.lb.requestConnection();
1✔
1323
          }
1324
        }
1✔
1325
      }
1326

1327
      syncContext.execute(new RequestConnection());
1✔
1328
    }
1329
    return savedChannelState;
1✔
1330
  }
1331

1332
  @Override
1333
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1334
    final class NotifyStateChanged implements Runnable {
1✔
1335
      @Override
1336
      public void run() {
1337
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1338
      }
1✔
1339
    }
1340

1341
    syncContext.execute(new NotifyStateChanged());
1✔
1342
  }
1✔
1343

1344
  @Override
1345
  public void resetConnectBackoff() {
1346
    final class ResetConnectBackoff implements Runnable {
1✔
1347
      @Override
1348
      public void run() {
1349
        if (shutdown.get()) {
1✔
1350
          return;
1✔
1351
        }
1352
        if (nameResolverStarted) {
1✔
1353
          refreshNameResolution();
1✔
1354
        }
1355
        for (InternalSubchannel subchannel : subchannels) {
1✔
1356
          subchannel.resetConnectBackoff();
1✔
1357
        }
1✔
1358
        for (OobChannel oobChannel : oobChannels) {
1✔
1359
          oobChannel.resetConnectBackoff();
×
1360
        }
×
1361
      }
1✔
1362
    }
1363

1364
    syncContext.execute(new ResetConnectBackoff());
1✔
1365
  }
1✔
1366

1367
  @Override
1368
  public void enterIdle() {
1369
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1370
      @Override
1371
      public void run() {
1372
        if (shutdown.get() || lbHelper == null) {
1✔
1373
          return;
1✔
1374
        }
1375
        cancelIdleTimer(/* permanent= */ false);
1✔
1376
        enterIdleMode();
1✔
1377
      }
1✔
1378
    }
1379

1380
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1381
  }
1✔
1382

1383
  /**
1384
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1385
   * backoff.
1386
   */
1387
  private final class UncommittedRetriableStreamsRegistry {
1✔
1388
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1389
    // it's worthwhile to look for a lock-free approach.
1390
    final Object lock = new Object();
1✔
1391

1392
    @GuardedBy("lock")
1✔
1393
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1394

1395
    @GuardedBy("lock")
1396
    Status shutdownStatus;
1397

1398
    void onShutdown(Status reason) {
1399
      boolean shouldShutdownDelayedTransport = false;
1✔
1400
      synchronized (lock) {
1✔
1401
        if (shutdownStatus != null) {
1✔
1402
          return;
1✔
1403
        }
1404
        shutdownStatus = reason;
1✔
1405
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1406
        // retriable streams, which may be in backoff and not using any transport, are already
1407
        // started RPCs.
1408
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1409
          shouldShutdownDelayedTransport = true;
1✔
1410
        }
1411
      }
1✔
1412

1413
      if (shouldShutdownDelayedTransport) {
1✔
1414
        delayedTransport.shutdown(reason);
1✔
1415
      }
1416
    }
1✔
1417

1418
    void onShutdownNow(Status reason) {
1419
      onShutdown(reason);
1✔
1420
      Collection<ClientStream> streams;
1421

1422
      synchronized (lock) {
1✔
1423
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1424
      }
1✔
1425

1426
      for (ClientStream stream : streams) {
1✔
1427
        stream.cancel(reason);
1✔
1428
      }
1✔
1429
      delayedTransport.shutdownNow(reason);
1✔
1430
    }
1✔
1431

1432
    /**
1433
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1434
     * shutdown Status.
1435
     */
1436
    @Nullable
1437
    Status add(RetriableStream<?> retriableStream) {
1438
      synchronized (lock) {
1✔
1439
        if (shutdownStatus != null) {
1✔
1440
          return shutdownStatus;
1✔
1441
        }
1442
        uncommittedRetriableStreams.add(retriableStream);
1✔
1443
        return null;
1✔
1444
      }
1445
    }
1446

1447
    void remove(RetriableStream<?> retriableStream) {
1448
      Status shutdownStatusCopy = null;
1✔
1449

1450
      synchronized (lock) {
1✔
1451
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1452
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1453
          shutdownStatusCopy = shutdownStatus;
1✔
1454
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1455
          // hashmap.
1456
          uncommittedRetriableStreams = new HashSet<>();
1✔
1457
        }
1458
      }
1✔
1459

1460
      if (shutdownStatusCopy != null) {
1✔
1461
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1462
      }
1463
    }
1✔
1464
  }
1465

1466
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1467
    AutoConfiguredLoadBalancer lb;
1468

1469
    @Override
1470
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1471
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1472
      // No new subchannel should be created after load balancer has been shutdown.
1473
      checkState(!terminating, "Channel is being terminated");
1✔
1474
      return new SubchannelImpl(args);
1✔
1475
    }
1476

1477
    @Override
1478
    public void updateBalancingState(
1479
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1480
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1481
      checkNotNull(newState, "newState");
1✔
1482
      checkNotNull(newPicker, "newPicker");
1✔
1483
      final class UpdateBalancingState implements Runnable {
1✔
1484
        @Override
1485
        public void run() {
1486
          if (LbHelperImpl.this != lbHelper) {
1✔
1487
            return;
1✔
1488
          }
1489
          updateSubchannelPicker(newPicker);
1✔
1490
          // It's not appropriate to report SHUTDOWN state from lb.
1491
          // Ignore the case of newState == SHUTDOWN for now.
1492
          if (newState != SHUTDOWN) {
1✔
1493
            channelLogger.log(
1✔
1494
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1495
            channelStateManager.gotoState(newState);
1✔
1496
          }
1497
        }
1✔
1498
      }
1499

1500
      syncContext.execute(new UpdateBalancingState());
1✔
1501
    }
1✔
1502

1503
    @Override
1504
    public void refreshNameResolution() {
1505
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1506
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1507
        @Override
1508
        public void run() {
1509
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1510
        }
1✔
1511
      }
1512

1513
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1514
    }
1✔
1515

1516
    @Override
1517
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1518
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1519
    }
1520

1521
    @Override
1522
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1523
        String authority) {
1524
      // TODO(ejona): can we be even stricter? Like terminating?
1525
      checkState(!terminated, "Channel is terminated");
1✔
1526
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1527
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1528
      InternalLogId subchannelLogId =
1✔
1529
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1530
      ChannelTracer oobChannelTracer =
1✔
1531
          new ChannelTracer(
1532
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1533
              "OobChannel for " + addressGroup);
1534
      final OobChannel oobChannel = new OobChannel(
1✔
1535
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1536
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1537
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1538
          .setDescription("Child OobChannel created")
1✔
1539
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1540
          .setTimestampNanos(oobChannelCreationTime)
1✔
1541
          .setChannelRef(oobChannel)
1✔
1542
          .build());
1✔
1543
      ChannelTracer subchannelTracer =
1✔
1544
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1545
              "Subchannel for " + addressGroup);
1546
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1547
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1548
        @Override
1549
        void onTerminated(InternalSubchannel is) {
1550
          oobChannels.remove(oobChannel);
1✔
1551
          channelz.removeSubchannel(is);
1✔
1552
          oobChannel.handleSubchannelTerminated();
1✔
1553
          maybeTerminateChannel();
1✔
1554
        }
1✔
1555

1556
        @Override
1557
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1558
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1559
          //  state and refresh name resolution if necessary.
1560
          handleInternalSubchannelState(newState);
1✔
1561
          oobChannel.handleSubchannelStateChange(newState);
1✔
1562
        }
1✔
1563
      }
1564

1565
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1566
          addressGroup,
1567
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1568
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1569
          // All callback methods are run from syncContext
1570
          new ManagedOobChannelCallback(),
1571
          channelz,
1✔
1572
          callTracerFactory.create(),
1✔
1573
          subchannelTracer,
1574
          subchannelLogId,
1575
          subchannelLogger,
1576
          transportFilters);
1✔
1577
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1578
          .setDescription("Child Subchannel created")
1✔
1579
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1580
          .setTimestampNanos(oobChannelCreationTime)
1✔
1581
          .setSubchannelRef(internalSubchannel)
1✔
1582
          .build());
1✔
1583
      channelz.addSubchannel(oobChannel);
1✔
1584
      channelz.addSubchannel(internalSubchannel);
1✔
1585
      oobChannel.setSubchannel(internalSubchannel);
1✔
1586
      final class AddOobChannel implements Runnable {
1✔
1587
        @Override
1588
        public void run() {
1589
          if (terminating) {
1✔
1590
            oobChannel.shutdown();
×
1591
          }
1592
          if (!terminated) {
1✔
1593
            // If channel has not terminated, it will track the subchannel and block termination
1594
            // for it.
1595
            oobChannels.add(oobChannel);
1✔
1596
          }
1597
        }
1✔
1598
      }
1599

1600
      syncContext.execute(new AddOobChannel());
1✔
1601
      return oobChannel;
1✔
1602
    }
1603

1604
    @Deprecated
1605
    @Override
1606
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1607
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1608
          // Override authority to keep the old behavior.
1609
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1610
          .overrideAuthority(getAuthority());
1✔
1611
    }
1612

1613
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1614
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1615
    @Override
1616
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1617
        final String target, final ChannelCredentials channelCreds) {
1618
      checkNotNull(channelCreds, "channelCreds");
1✔
1619

1620
      final class ResolvingOobChannelBuilder
1621
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1622
        final ManagedChannelBuilder<?> delegate;
1623

1624
        ResolvingOobChannelBuilder() {
1✔
1625
          final ClientTransportFactory transportFactory;
1626
          CallCredentials callCredentials;
1627
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1628
            transportFactory = originalTransportFactory;
1✔
1629
            callCredentials = null;
1✔
1630
          } else {
1631
            SwapChannelCredentialsResult swapResult =
1✔
1632
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1633
            if (swapResult == null) {
1✔
1634
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1635
              return;
×
1636
            } else {
1637
              transportFactory = swapResult.transportFactory;
1✔
1638
              callCredentials = swapResult.callCredentials;
1✔
1639
            }
1640
          }
1641
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1642
              new ClientTransportFactoryBuilder() {
1✔
1643
                @Override
1644
                public ClientTransportFactory buildClientTransportFactory() {
1645
                  return transportFactory;
1✔
1646
                }
1647
              };
1648
          delegate = new ManagedChannelImplBuilder(
1✔
1649
              target,
1650
              channelCreds,
1651
              callCredentials,
1652
              transportFactoryBuilder,
1653
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1654
              .nameResolverRegistry(nameResolverRegistry);
1✔
1655
        }
1✔
1656

1657
        @Override
1658
        protected ManagedChannelBuilder<?> delegate() {
1659
          return delegate;
1✔
1660
        }
1661
      }
1662

1663
      checkState(!terminated, "Channel is terminated");
1✔
1664

1665
      @SuppressWarnings("deprecation")
1666
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1667

1668
      return builder
1✔
1669
          // TODO(zdapeng): executors should not outlive the parent channel.
1670
          .executor(executor)
1✔
1671
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1672
          .maxTraceEvents(maxTraceEvents)
1✔
1673
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1674
          .userAgent(userAgent);
1✔
1675
    }
1676

1677
    @Override
1678
    public ChannelCredentials getUnsafeChannelCredentials() {
1679
      if (originalChannelCreds == null) {
×
1680
        return new DefaultChannelCreds();
×
1681
      }
1682
      return originalChannelCreds;
×
1683
    }
1684

1685
    @Override
1686
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1687
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1688
    }
×
1689

1690
    @Override
1691
    public void updateOobChannelAddresses(ManagedChannel channel,
1692
        List<EquivalentAddressGroup> eag) {
1693
      checkArgument(channel instanceof OobChannel,
1✔
1694
          "channel must have been returned from createOobChannel");
1695
      ((OobChannel) channel).updateAddresses(eag);
1✔
1696
    }
1✔
1697

1698
    @Override
1699
    public String getAuthority() {
1700
      return ManagedChannelImpl.this.authority();
1✔
1701
    }
1702

1703
    @Override
1704
    public SynchronizationContext getSynchronizationContext() {
1705
      return syncContext;
1✔
1706
    }
1707

1708
    @Override
1709
    public ScheduledExecutorService getScheduledExecutorService() {
1710
      return scheduledExecutor;
1✔
1711
    }
1712

1713
    @Override
1714
    public ChannelLogger getChannelLogger() {
1715
      return channelLogger;
1✔
1716
    }
1717

1718
    @Override
1719
    public NameResolver.Args getNameResolverArgs() {
1720
      return nameResolverArgs;
1✔
1721
    }
1722

1723
    @Override
1724
    public NameResolverRegistry getNameResolverRegistry() {
1725
      return nameResolverRegistry;
1✔
1726
    }
1727

1728
    /**
1729
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1730
     */
1731
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1732
    //     channel creds.
1733
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1734
      @Override
1735
      public ChannelCredentials withoutBearerTokens() {
1736
        return this;
×
1737
      }
1738
    }
1739
  }
1740

1741
  final class NameResolverListener extends NameResolver.Listener2 {
1742
    final LbHelperImpl helper;
1743
    final NameResolver resolver;
1744

1745
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1746
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1747
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1748
    }
1✔
1749

1750
    @Override
1751
    public void onResult(final ResolutionResult resolutionResult) {
1752
      final class NamesResolved implements Runnable {
1✔
1753

1754
        @SuppressWarnings("ReferenceEquality")
1755
        @Override
1756
        public void run() {
1757
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1758
            return;
1✔
1759
          }
1760

1761
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1762
          channelLogger.log(
1✔
1763
              ChannelLogLevel.DEBUG,
1764
              "Resolved address: {0}, config={1}",
1765
              servers,
1766
              resolutionResult.getAttributes());
1✔
1767

1768
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1769
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1770
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1771
          }
1772

1773
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1774
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1775
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1776
          InternalConfigSelector resolvedConfigSelector =
1✔
1777
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1778
          ManagedChannelServiceConfig validServiceConfig =
1779
              configOrError != null && configOrError.getConfig() != null
1✔
1780
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1781
                  : null;
1✔
1782
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1783

1784
          ManagedChannelServiceConfig effectiveServiceConfig;
1785
          if (!lookUpServiceConfig) {
1✔
1786
            if (validServiceConfig != null) {
1✔
1787
              channelLogger.log(
1✔
1788
                  ChannelLogLevel.INFO,
1789
                  "Service config from name resolver discarded by channel settings");
1790
            }
1791
            effectiveServiceConfig =
1792
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1793
            if (resolvedConfigSelector != null) {
1✔
1794
              channelLogger.log(
1✔
1795
                  ChannelLogLevel.INFO,
1796
                  "Config selector from name resolver discarded by channel settings");
1797
            }
1798
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1799
          } else {
1800
            // Try to use config if returned from name resolver
1801
            // Otherwise, try to use the default config if available
1802
            if (validServiceConfig != null) {
1✔
1803
              effectiveServiceConfig = validServiceConfig;
1✔
1804
              if (resolvedConfigSelector != null) {
1✔
1805
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1806
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1807
                  channelLogger.log(
×
1808
                      ChannelLogLevel.DEBUG,
1809
                      "Method configs in service config will be discarded due to presence of"
1810
                          + "config-selector");
1811
                }
1812
              } else {
1813
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1814
              }
1815
            } else if (defaultServiceConfig != null) {
1✔
1816
              effectiveServiceConfig = defaultServiceConfig;
1✔
1817
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1818
              channelLogger.log(
1✔
1819
                  ChannelLogLevel.INFO,
1820
                  "Received no service config, using default service config");
1821
            } else if (serviceConfigError != null) {
1✔
1822
              if (!serviceConfigUpdated) {
1✔
1823
                // First DNS lookup has invalid service config, and cannot fall back to default
1824
                channelLogger.log(
1✔
1825
                    ChannelLogLevel.INFO,
1826
                    "Fallback to error due to invalid first service config without default config");
1827
                // This error could be an "inappropriate" control plane error that should not bleed
1828
                // through to client code using gRPC. We let them flow through here to the LB as
1829
                // we later check for these error codes when investigating pick results in
1830
                // GrpcUtil.getTransportFromPickResult().
1831
                onError(configOrError.getError());
1✔
1832
                if (resolutionResultListener != null) {
1✔
1833
                  resolutionResultListener.resolutionAttempted(configOrError.getError());
1✔
1834
                }
1835
                return;
1✔
1836
              } else {
1837
                effectiveServiceConfig = lastServiceConfig;
1✔
1838
              }
1839
            } else {
1840
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1841
              realChannel.updateConfigSelector(null);
1✔
1842
            }
1843
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1844
              channelLogger.log(
1✔
1845
                  ChannelLogLevel.INFO,
1846
                  "Service config changed{0}",
1847
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1848
              lastServiceConfig = effectiveServiceConfig;
1✔
1849
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1850
            }
1851

1852
            try {
1853
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1854
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1855
              //  lbNeedAddress is not deterministic
1856
              serviceConfigUpdated = true;
1✔
1857
            } catch (RuntimeException re) {
×
1858
              logger.log(
×
1859
                  Level.WARNING,
1860
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1861
                  re);
1862
            }
1✔
1863
          }
1864

1865
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1866
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1867
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1868
            Attributes.Builder attrBuilder =
1✔
1869
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1870
            Map<String, ?> healthCheckingConfig =
1✔
1871
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1872
            if (healthCheckingConfig != null) {
1✔
1873
              attrBuilder
1✔
1874
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1875
                  .build();
1✔
1876
            }
1877
            Attributes attributes = attrBuilder.build();
1✔
1878

1879
            Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1880
                ResolvedAddresses.newBuilder()
1✔
1881
                    .setAddresses(servers)
1✔
1882
                    .setAttributes(attributes)
1✔
1883
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1884
                    .build());
1✔
1885
            // If a listener is provided, let it know if the addresses were accepted.
1886
            if (resolutionResultListener != null) {
1✔
1887
              resolutionResultListener.resolutionAttempted(addressAcceptanceStatus);
1✔
1888
            }
1889
          }
1890
        }
1✔
1891
      }
1892

1893
      syncContext.execute(new NamesResolved());
1✔
1894
    }
1✔
1895

1896
    @Override
1897
    public void onError(final Status error) {
1898
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1899
      final class NameResolverErrorHandler implements Runnable {
1✔
1900
        @Override
1901
        public void run() {
1902
          handleErrorInSyncContext(error);
1✔
1903
        }
1✔
1904
      }
1905

1906
      syncContext.execute(new NameResolverErrorHandler());
1✔
1907
    }
1✔
1908

1909
    private void handleErrorInSyncContext(Status error) {
1910
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1911
          new Object[] {getLogId(), error});
1✔
1912
      realChannel.onConfigError();
1✔
1913
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1914
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1915
        lastResolutionState = ResolutionState.ERROR;
1✔
1916
      }
1917
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1918
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1919
        return;
1✔
1920
      }
1921

1922
      helper.lb.handleNameResolutionError(error);
1✔
1923
    }
1✔
1924
  }
1925

1926
  private final class SubchannelImpl extends AbstractSubchannel {
1927
    final CreateSubchannelArgs args;
1928
    final InternalLogId subchannelLogId;
1929
    final ChannelLoggerImpl subchannelLogger;
1930
    final ChannelTracer subchannelTracer;
1931
    List<EquivalentAddressGroup> addressGroups;
1932
    InternalSubchannel subchannel;
1933
    boolean started;
1934
    boolean shutdown;
1935
    ScheduledHandle delayedShutdownTask;
1936

1937
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1938
      checkNotNull(args, "args");
1✔
1939
      addressGroups = args.getAddresses();
1✔
1940
      if (authorityOverride != null) {
1✔
1941
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1942
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1943
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1944
      }
1945
      this.args = args;
1✔
1946
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1947
      subchannelTracer = new ChannelTracer(
1✔
1948
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1949
          "Subchannel for " + args.getAddresses());
1✔
1950
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1951
    }
1✔
1952

1953
    @Override
1954
    public void start(final SubchannelStateListener listener) {
1955
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1956
      checkState(!started, "already started");
1✔
1957
      checkState(!shutdown, "already shutdown");
1✔
1958
      checkState(!terminating, "Channel is being terminated");
1✔
1959
      started = true;
1✔
1960
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1961
        // All callbacks are run in syncContext
1962
        @Override
1963
        void onTerminated(InternalSubchannel is) {
1964
          subchannels.remove(is);
1✔
1965
          channelz.removeSubchannel(is);
1✔
1966
          maybeTerminateChannel();
1✔
1967
        }
1✔
1968

1969
        @Override
1970
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1971
          checkState(listener != null, "listener is null");
1✔
1972
          listener.onSubchannelState(newState);
1✔
1973
        }
1✔
1974

1975
        @Override
1976
        void onInUse(InternalSubchannel is) {
1977
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1978
        }
1✔
1979

1980
        @Override
1981
        void onNotInUse(InternalSubchannel is) {
1982
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1983
        }
1✔
1984
      }
1985

1986
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1987
          args.getAddresses(),
1✔
1988
          authority(),
1✔
1989
          userAgent,
1✔
1990
          backoffPolicyProvider,
1✔
1991
          transportFactory,
1✔
1992
          transportFactory.getScheduledExecutorService(),
1✔
1993
          stopwatchSupplier,
1✔
1994
          syncContext,
1995
          new ManagedInternalSubchannelCallback(),
1996
          channelz,
1✔
1997
          callTracerFactory.create(),
1✔
1998
          subchannelTracer,
1999
          subchannelLogId,
2000
          subchannelLogger,
2001
          transportFilters);
1✔
2002

2003
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
2004
          .setDescription("Child Subchannel started")
1✔
2005
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
2006
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
2007
          .setSubchannelRef(internalSubchannel)
1✔
2008
          .build());
1✔
2009

2010
      this.subchannel = internalSubchannel;
1✔
2011
      channelz.addSubchannel(internalSubchannel);
1✔
2012
      subchannels.add(internalSubchannel);
1✔
2013
    }
1✔
2014

2015
    @Override
2016
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
2017
      checkState(started, "not started");
1✔
2018
      return subchannel;
1✔
2019
    }
2020

2021
    @Override
2022
    public void shutdown() {
2023
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2024
      if (subchannel == null) {
1✔
2025
        // start() was not successful
2026
        shutdown = true;
×
2027
        return;
×
2028
      }
2029
      if (shutdown) {
1✔
2030
        if (terminating && delayedShutdownTask != null) {
1✔
2031
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
2032
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
2033
          delayedShutdownTask.cancel();
×
2034
          delayedShutdownTask = null;
×
2035
          // Will fall through to the subchannel.shutdown() at the end.
2036
        } else {
2037
          return;
1✔
2038
        }
2039
      } else {
2040
        shutdown = true;
1✔
2041
      }
2042
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
2043
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
2044
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
2045
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
2046
      // shutdown of Subchannel for a few seconds here.
2047
      //
2048
      // TODO(zhangkun83): consider a better approach
2049
      // (https://github.com/grpc/grpc-java/issues/2562).
2050
      if (!terminating) {
1✔
2051
        final class ShutdownSubchannel implements Runnable {
1✔
2052
          @Override
2053
          public void run() {
2054
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
2055
          }
1✔
2056
        }
2057

2058
        delayedShutdownTask = syncContext.schedule(
1✔
2059
            new LogExceptionRunnable(new ShutdownSubchannel()),
2060
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2061
            transportFactory.getScheduledExecutorService());
1✔
2062
        return;
1✔
2063
      }
2064
      // When terminating == true, no more real streams will be created. It's safe and also
2065
      // desirable to shutdown timely.
2066
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
2067
    }
1✔
2068

2069
    @Override
2070
    public void requestConnection() {
2071
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2072
      checkState(started, "not started");
1✔
2073
      subchannel.obtainActiveTransport();
1✔
2074
    }
1✔
2075

2076
    @Override
2077
    public List<EquivalentAddressGroup> getAllAddresses() {
2078
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2079
      checkState(started, "not started");
1✔
2080
      return addressGroups;
1✔
2081
    }
2082

2083
    @Override
2084
    public Attributes getAttributes() {
2085
      return args.getAttributes();
1✔
2086
    }
2087

2088
    @Override
2089
    public String toString() {
2090
      return subchannelLogId.toString();
1✔
2091
    }
2092

2093
    @Override
2094
    public Channel asChannel() {
2095
      checkState(started, "not started");
1✔
2096
      return new SubchannelChannel(
1✔
2097
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2098
          transportFactory.getScheduledExecutorService(),
1✔
2099
          callTracerFactory.create(),
1✔
2100
          new AtomicReference<InternalConfigSelector>(null));
2101
    }
2102

2103
    @Override
2104
    public Object getInternalSubchannel() {
2105
      checkState(started, "Subchannel is not started");
1✔
2106
      return subchannel;
1✔
2107
    }
2108

2109
    @Override
2110
    public ChannelLogger getChannelLogger() {
2111
      return subchannelLogger;
1✔
2112
    }
2113

2114
    @Override
2115
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2116
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2117
      addressGroups = addrs;
1✔
2118
      if (authorityOverride != null) {
1✔
2119
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2120
      }
2121
      subchannel.updateAddresses(addrs);
1✔
2122
    }
1✔
2123

2124
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2125
        List<EquivalentAddressGroup> eags) {
2126
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2127
      for (EquivalentAddressGroup eag : eags) {
1✔
2128
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2129
            eag.getAddresses(),
1✔
2130
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2131
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2132
      }
1✔
2133
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2134
    }
2135
  }
2136

2137
  @Override
2138
  public String toString() {
2139
    return MoreObjects.toStringHelper(this)
1✔
2140
        .add("logId", logId.getId())
1✔
2141
        .add("target", target)
1✔
2142
        .toString();
1✔
2143
  }
2144

2145
  /**
2146
   * Called from syncContext.
2147
   */
2148
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2149
    @Override
2150
    public void transportShutdown(Status s) {
2151
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2152
    }
1✔
2153

2154
    @Override
2155
    public void transportReady() {
2156
      // Don't care
2157
    }
×
2158

2159
    @Override
2160
    public Attributes filterTransport(Attributes attributes) {
2161
      return attributes;
×
2162
    }
2163

2164
    @Override
2165
    public void transportInUse(final boolean inUse) {
2166
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2167
    }
1✔
2168

2169
    @Override
2170
    public void transportTerminated() {
2171
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2172
      terminating = true;
1✔
2173
      shutdownNameResolverAndLoadBalancer(false);
1✔
2174
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2175
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2176
      // here.
2177
      maybeShutdownNowSubchannels();
1✔
2178
      maybeTerminateChannel();
1✔
2179
    }
1✔
2180
  }
2181

2182
  /**
2183
   * Must be accessed from syncContext.
2184
   */
2185
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2186
    @Override
2187
    protected void handleInUse() {
2188
      exitIdleMode();
1✔
2189
    }
1✔
2190

2191
    @Override
2192
    protected void handleNotInUse() {
2193
      if (shutdown.get()) {
1✔
2194
        return;
1✔
2195
      }
2196
      rescheduleIdleTimer();
1✔
2197
    }
1✔
2198
  }
2199

2200
  /**
2201
   * Lazily request for Executor from an executor pool.
2202
   * Also act as an Executor directly to simply run a cmd
2203
   */
2204
  @VisibleForTesting
2205
  static final class ExecutorHolder implements Executor {
2206
    private final ObjectPool<? extends Executor> pool;
2207
    private Executor executor;
2208

2209
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2210
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2211
    }
1✔
2212

2213
    synchronized Executor getExecutor() {
2214
      if (executor == null) {
1✔
2215
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2216
      }
2217
      return executor;
1✔
2218
    }
2219

2220
    synchronized void release() {
2221
      if (executor != null) {
1✔
2222
        executor = pool.returnObject(executor);
1✔
2223
      }
2224
    }
1✔
2225

2226
    @Override
2227
    public void execute(Runnable command) {
2228
      getExecutor().execute(command);
1✔
2229
    }
1✔
2230
  }
2231

2232
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2233
    final ScheduledExecutorService delegate;
2234

2235
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2236
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2237
    }
1✔
2238

2239
    @Override
2240
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2241
      return delegate.schedule(callable, delay, unit);
×
2242
    }
2243

2244
    @Override
2245
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2246
      return delegate.schedule(cmd, delay, unit);
1✔
2247
    }
2248

2249
    @Override
2250
    public ScheduledFuture<?> scheduleAtFixedRate(
2251
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2252
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
×
2253
    }
2254

2255
    @Override
2256
    public ScheduledFuture<?> scheduleWithFixedDelay(
2257
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2258
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2259
    }
2260

2261
    @Override
2262
    public boolean awaitTermination(long timeout, TimeUnit unit)
2263
        throws InterruptedException {
2264
      return delegate.awaitTermination(timeout, unit);
×
2265
    }
2266

2267
    @Override
2268
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2269
        throws InterruptedException {
2270
      return delegate.invokeAll(tasks);
×
2271
    }
2272

2273
    @Override
2274
    public <T> List<Future<T>> invokeAll(
2275
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2276
        throws InterruptedException {
2277
      return delegate.invokeAll(tasks, timeout, unit);
×
2278
    }
2279

2280
    @Override
2281
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2282
        throws InterruptedException, ExecutionException {
2283
      return delegate.invokeAny(tasks);
×
2284
    }
2285

2286
    @Override
2287
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2288
        throws InterruptedException, ExecutionException, TimeoutException {
2289
      return delegate.invokeAny(tasks, timeout, unit);
×
2290
    }
2291

2292
    @Override
2293
    public boolean isShutdown() {
2294
      return delegate.isShutdown();
×
2295
    }
2296

2297
    @Override
2298
    public boolean isTerminated() {
2299
      return delegate.isTerminated();
×
2300
    }
2301

2302
    @Override
2303
    public void shutdown() {
2304
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2305
    }
2306

2307
    @Override
2308
    public List<Runnable> shutdownNow() {
2309
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2310
    }
2311

2312
    @Override
2313
    public <T> Future<T> submit(Callable<T> task) {
2314
      return delegate.submit(task);
×
2315
    }
2316

2317
    @Override
2318
    public Future<?> submit(Runnable task) {
2319
      return delegate.submit(task);
×
2320
    }
2321

2322
    @Override
2323
    public <T> Future<T> submit(Runnable task, T result) {
2324
      return delegate.submit(task, result);
×
2325
    }
2326

2327
    @Override
2328
    public void execute(Runnable command) {
2329
      delegate.execute(command);
×
2330
    }
×
2331
  }
2332

2333
  /**
2334
   * A ResolutionState indicates the status of last name resolution.
2335
   */
2336
  enum ResolutionState {
1✔
2337
    NO_RESOLUTION,
1✔
2338
    SUCCESS,
1✔
2339
    ERROR
1✔
2340
  }
2341
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc