• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19211

08 May 2024 10:50PM UTC coverage: 88.309% (-0.02%) from 88.328%
#19211

push

github

ejona86
xds: Plumb locality in xds_cluster_impl and weighted_target

As part of gRFC A78:

> To support the locality label in the WRR metrics, we will extend the
> `weighted_target` LB policy (see A28) to define a resolver attribute
> that indicates the name of its child. This attribute will be passed
> down to each of its children with the appropriate value, so that any
> LB policy that sits underneath the `weighted_target` policy will be
> able to use it.

xds_cluster_impl is involved because it uses the child names in the
AddressFilter, which must match the names used by weighted_target.
Instead of using Locality.toString() in multiple policies and assuming
the policies agree, we now have xds_cluster_impl decide the locality's
name and pass it down explicitly. This allows us to change the name
format to match gRFC A78:

> If locality information is available, the value of this label will be
> of the form `{region="${REGION}", zone="${ZONE}",
> sub_zone="${SUB_ZONE}"}`, where `${REGION}`, `${ZONE}`, and
> `${SUB_ZONE}` are replaced with the actual values. If no locality
> information is available, the label will be set to the empty string.

31515 of 35687 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.46
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import io.grpc.Attributes;
36
import io.grpc.CallCredentials;
37
import io.grpc.CallOptions;
38
import io.grpc.Channel;
39
import io.grpc.ChannelCredentials;
40
import io.grpc.ChannelLogger;
41
import io.grpc.ChannelLogger.ChannelLogLevel;
42
import io.grpc.ClientCall;
43
import io.grpc.ClientInterceptor;
44
import io.grpc.ClientInterceptors;
45
import io.grpc.ClientStreamTracer;
46
import io.grpc.ClientTransportFilter;
47
import io.grpc.CompressorRegistry;
48
import io.grpc.ConnectivityState;
49
import io.grpc.ConnectivityStateInfo;
50
import io.grpc.Context;
51
import io.grpc.Deadline;
52
import io.grpc.DecompressorRegistry;
53
import io.grpc.EquivalentAddressGroup;
54
import io.grpc.ForwardingChannelBuilder2;
55
import io.grpc.ForwardingClientCall;
56
import io.grpc.Grpc;
57
import io.grpc.InternalChannelz;
58
import io.grpc.InternalChannelz.ChannelStats;
59
import io.grpc.InternalChannelz.ChannelTrace;
60
import io.grpc.InternalConfigSelector;
61
import io.grpc.InternalInstrumented;
62
import io.grpc.InternalLogId;
63
import io.grpc.InternalWithLogId;
64
import io.grpc.LoadBalancer;
65
import io.grpc.LoadBalancer.CreateSubchannelArgs;
66
import io.grpc.LoadBalancer.PickResult;
67
import io.grpc.LoadBalancer.PickSubchannelArgs;
68
import io.grpc.LoadBalancer.ResolvedAddresses;
69
import io.grpc.LoadBalancer.SubchannelPicker;
70
import io.grpc.LoadBalancer.SubchannelStateListener;
71
import io.grpc.ManagedChannel;
72
import io.grpc.ManagedChannelBuilder;
73
import io.grpc.Metadata;
74
import io.grpc.MethodDescriptor;
75
import io.grpc.MetricInstrumentRegistry;
76
import io.grpc.MetricRecorder;
77
import io.grpc.NameResolver;
78
import io.grpc.NameResolver.ConfigOrError;
79
import io.grpc.NameResolver.ResolutionResult;
80
import io.grpc.NameResolverProvider;
81
import io.grpc.NameResolverRegistry;
82
import io.grpc.ProxyDetector;
83
import io.grpc.Status;
84
import io.grpc.SynchronizationContext;
85
import io.grpc.SynchronizationContext.ScheduledHandle;
86
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
87
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
88
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
89
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
90
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
91
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
92
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
93
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
94
import io.grpc.internal.RetriableStream.Throttle;
95
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
96
import java.net.SocketAddress;
97
import java.net.URI;
98
import java.net.URISyntaxException;
99
import java.util.ArrayList;
100
import java.util.Collection;
101
import java.util.Collections;
102
import java.util.HashSet;
103
import java.util.LinkedHashSet;
104
import java.util.List;
105
import java.util.Map;
106
import java.util.Set;
107
import java.util.concurrent.Callable;
108
import java.util.concurrent.CountDownLatch;
109
import java.util.concurrent.ExecutionException;
110
import java.util.concurrent.Executor;
111
import java.util.concurrent.Future;
112
import java.util.concurrent.ScheduledExecutorService;
113
import java.util.concurrent.ScheduledFuture;
114
import java.util.concurrent.TimeUnit;
115
import java.util.concurrent.TimeoutException;
116
import java.util.concurrent.atomic.AtomicBoolean;
117
import java.util.concurrent.atomic.AtomicReference;
118
import java.util.logging.Level;
119
import java.util.logging.Logger;
120
import java.util.regex.Pattern;
121
import javax.annotation.Nullable;
122
import javax.annotation.concurrent.GuardedBy;
123
import javax.annotation.concurrent.ThreadSafe;
124

125
/** A communication channel for making outgoing RPCs. */
126
@ThreadSafe
127
final class ManagedChannelImpl extends ManagedChannel implements
128
    InternalInstrumented<ChannelStats> {
129
  @VisibleForTesting
130
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
131

132
  // Matching this pattern means the target string is a URI target or at least intended to be one.
133
  // A URI target must be an absolute hierarchical URI.
134
  // From RFC 2396: scheme = alpha *( alpha | digit | "+" | "-" | "." )
135
  @VisibleForTesting
136
  static final Pattern URI_PATTERN = Pattern.compile("[a-zA-Z][a-zA-Z0-9+.-]*:/.*");
1✔
137

138
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
139

140
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
141

142
  @VisibleForTesting
143
  static final Status SHUTDOWN_NOW_STATUS =
1✔
144
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
145

146
  @VisibleForTesting
147
  static final Status SHUTDOWN_STATUS =
1✔
148
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
149

150
  @VisibleForTesting
151
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
152
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
153

154
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
155
      ManagedChannelServiceConfig.empty();
1✔
156
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
157
      new InternalConfigSelector() {
1✔
158
        @Override
159
        public Result selectConfig(PickSubchannelArgs args) {
160
          throw new IllegalStateException("Resolution is pending");
×
161
        }
162
      };
163
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
164
      new LoadBalancer.PickDetailsConsumer() {};
1✔
165

166
  private final InternalLogId logId;
167
  private final String target;
168
  @Nullable
169
  private final String authorityOverride;
170
  private final NameResolverRegistry nameResolverRegistry;
171
  private final URI targetUri;
172
  private final NameResolverProvider nameResolverProvider;
173
  private final NameResolver.Args nameResolverArgs;
174
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
175
  private final ClientTransportFactory originalTransportFactory;
176
  @Nullable
177
  private final ChannelCredentials originalChannelCreds;
178
  private final ClientTransportFactory transportFactory;
179
  private final ClientTransportFactory oobTransportFactory;
180
  private final RestrictedScheduledExecutor scheduledExecutor;
181
  private final Executor executor;
182
  private final ObjectPool<? extends Executor> executorPool;
183
  private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
184
  private final ExecutorHolder balancerRpcExecutorHolder;
185
  private final ExecutorHolder offloadExecutorHolder;
186
  private final TimeProvider timeProvider;
187
  private final int maxTraceEvents;
188

189
  @VisibleForTesting
1✔
190
  final SynchronizationContext syncContext = new SynchronizationContext(
191
      new Thread.UncaughtExceptionHandler() {
1✔
192
        @Override
193
        public void uncaughtException(Thread t, Throwable e) {
194
          logger.log(
1✔
195
              Level.SEVERE,
196
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
197
              e);
198
          panic(e);
1✔
199
        }
1✔
200
      });
201

202
  private boolean fullStreamDecompression;
203

204
  private final DecompressorRegistry decompressorRegistry;
205
  private final CompressorRegistry compressorRegistry;
206

207
  private final Supplier<Stopwatch> stopwatchSupplier;
208
  /** The timout before entering idle mode. */
209
  private final long idleTimeoutMillis;
210

211
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
212
  private final BackoffPolicy.Provider backoffPolicyProvider;
213

214
  /**
215
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
216
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
217
   * {@link RealChannel}.
218
   */
219
  private final Channel interceptorChannel;
220

221
  private final List<ClientTransportFilter> transportFilters;
222
  @Nullable private final String userAgent;
223

224
  // Only null after channel is terminated. Must be assigned from the syncContext.
225
  private NameResolver nameResolver;
226

227
  // Must be accessed from the syncContext.
228
  private boolean nameResolverStarted;
229

230
  // null when channel is in idle mode.  Must be assigned from syncContext.
231
  @Nullable
232
  private LbHelperImpl lbHelper;
233

234
  // Must ONLY be assigned from updateSubchannelPicker(), which is called from syncContext.
235
  // null if channel is in idle mode.
236
  @Nullable
237
  private volatile SubchannelPicker subchannelPicker;
238

239
  // Must be accessed from the syncContext
240
  private boolean panicMode;
241

242
  // Must be mutated from syncContext
243
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
244
  // switch to a ConcurrentHashMap.
245
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
246

247
  // Must be accessed from syncContext
248
  @Nullable
249
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
250
  private final Object pendingCallsInUseObject = new Object();
1✔
251

252
  // Must be mutated from syncContext
253
  private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
1✔
254

255
  // reprocess() must be run from syncContext
256
  private final DelayedClientTransport delayedTransport;
257
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
258
      = new UncommittedRetriableStreamsRegistry();
259

260
  // Shutdown states.
261
  //
262
  // Channel's shutdown process:
263
  // 1. shutdown(): stop accepting new calls from applications
264
  //   1a shutdown <- true
265
  //   1b subchannelPicker <- null
266
  //   1c delayedTransport.shutdown()
267
  // 2. delayedTransport terminated: stop stream-creation functionality
268
  //   2a terminating <- true
269
  //   2b loadBalancer.shutdown()
270
  //     * LoadBalancer will shutdown subchannels and OOB channels
271
  //   2c loadBalancer <- null
272
  //   2d nameResolver.shutdown()
273
  //   2e nameResolver <- null
274
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
275

276
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
277
  // Must only be mutated and read from syncContext
278
  private boolean shutdownNowed;
279
  // Must only be mutated from syncContext
280
  private boolean terminating;
281
  // Must be mutated from syncContext
282
  private volatile boolean terminated;
283
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
284

285
  private final CallTracer.Factory callTracerFactory;
286
  private final CallTracer channelCallTracer;
287
  private final ChannelTracer channelTracer;
288
  private final ChannelLogger channelLogger;
289
  private final InternalChannelz channelz;
290
  private final RealChannel realChannel;
291
  // Must be mutated and read from syncContext
292
  // a flag for doing channel tracing when flipped
293
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
294
  // Must be mutated and read from constructor or syncContext
295
  // used for channel tracing when value changed
296
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
297

298
  @Nullable
299
  private final ManagedChannelServiceConfig defaultServiceConfig;
300
  // Must be mutated and read from constructor or syncContext
301
  private boolean serviceConfigUpdated = false;
1✔
302
  private final boolean lookUpServiceConfig;
303

304
  // One instance per channel.
305
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
306

307
  private final long perRpcBufferLimit;
308
  private final long channelBufferLimit;
309

310
  // Temporary false flag that can skip the retry code path.
311
  private final boolean retryEnabled;
312

313
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
314

315
  // Called from syncContext
316
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
317
      new DelayedTransportListener();
318

319
  // Must be called from syncContext
320
  private void maybeShutdownNowSubchannels() {
321
    if (shutdownNowed) {
1✔
322
      for (InternalSubchannel subchannel : subchannels) {
1✔
323
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
324
      }
1✔
325
      for (OobChannel oobChannel : oobChannels) {
1✔
326
        oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
327
      }
1✔
328
    }
329
  }
1✔
330

331
  // Must be accessed from syncContext
332
  @VisibleForTesting
1✔
333
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
334

335
  @Override
336
  public ListenableFuture<ChannelStats> getStats() {
337
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
338
    final class StatsFetcher implements Runnable {
1✔
339
      @Override
340
      public void run() {
341
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
342
        channelCallTracer.updateBuilder(builder);
1✔
343
        channelTracer.updateBuilder(builder);
1✔
344
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
345
        List<InternalWithLogId> children = new ArrayList<>();
1✔
346
        children.addAll(subchannels);
1✔
347
        children.addAll(oobChannels);
1✔
348
        builder.setSubchannels(children);
1✔
349
        ret.set(builder.build());
1✔
350
      }
1✔
351
    }
352

353
    // subchannels and oobchannels can only be accessed from syncContext
354
    syncContext.execute(new StatsFetcher());
1✔
355
    return ret;
1✔
356
  }
357

358
  @Override
359
  public InternalLogId getLogId() {
360
    return logId;
1✔
361
  }
362

363
  // Run from syncContext
364
  private class IdleModeTimer implements Runnable {
1✔
365

366
    @Override
367
    public void run() {
368
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
369
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
370
      // subtle to change rapidly to resolve the channel panic. See #8714
371
      if (lbHelper == null) {
1✔
372
        return;
×
373
      }
374
      enterIdleMode();
1✔
375
    }
1✔
376
  }
377

378
  // Must be called from syncContext
379
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
380
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
381
    if (channelIsActive) {
1✔
382
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
383
      checkState(lbHelper != null, "lbHelper is null");
1✔
384
    }
385
    if (nameResolver != null) {
1✔
386
      nameResolver.shutdown();
1✔
387
      nameResolverStarted = false;
1✔
388
      if (channelIsActive) {
1✔
389
        nameResolver = getNameResolver(
1✔
390
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
391
      } else {
392
        nameResolver = null;
1✔
393
      }
394
    }
395
    if (lbHelper != null) {
1✔
396
      lbHelper.lb.shutdown();
1✔
397
      lbHelper = null;
1✔
398
    }
399
    subchannelPicker = null;
1✔
400
  }
1✔
401

402
  /**
403
   * Make the channel exit idle mode, if it's in it.
404
   *
405
   * <p>Must be called from syncContext
406
   */
407
  @VisibleForTesting
408
  void exitIdleMode() {
409
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
410
    if (shutdown.get() || panicMode) {
1✔
411
      return;
1✔
412
    }
413
    if (inUseStateAggregator.isInUse()) {
1✔
414
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
415
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
416
      cancelIdleTimer(false);
1✔
417
    } else {
418
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
419
      // isInUse() == false, in which case we still need to schedule the timer.
420
      rescheduleIdleTimer();
1✔
421
    }
422
    if (lbHelper != null) {
1✔
423
      return;
1✔
424
    }
425
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
426
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
427
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
428
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
429
    // may throw. We don't want to confuse our state, even if we will enter panic mode.
430
    this.lbHelper = lbHelper;
1✔
431

432
    channelStateManager.gotoState(CONNECTING);
1✔
433
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
434
    nameResolver.start(listener);
1✔
435
    nameResolverStarted = true;
1✔
436
  }
1✔
437

438
  // Must be run from syncContext
439
  private void enterIdleMode() {
440
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
441
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
442
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
443
    // which are bugs.
444
    shutdownNameResolverAndLoadBalancer(true);
1✔
445
    delayedTransport.reprocess(null);
1✔
446
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
447
    channelStateManager.gotoState(IDLE);
1✔
448
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
449
    // transport to be holding some we need to exit idle mode to give these calls a chance to
450
    // be processed.
451
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
452
      exitIdleMode();
1✔
453
    }
454
  }
1✔
455

456
  // Must be run from syncContext
457
  private void cancelIdleTimer(boolean permanent) {
458
    idleTimer.cancel(permanent);
1✔
459
  }
1✔
460

461
  // Always run from syncContext
462
  private void rescheduleIdleTimer() {
463
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
464
      return;
1✔
465
    }
466
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
467
  }
1✔
468

469
  /**
470
   * Force name resolution refresh to happen immediately. Must be run
471
   * from syncContext.
472
   */
473
  private void refreshNameResolution() {
474
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
475
    if (nameResolverStarted) {
1✔
476
      nameResolver.refresh();
1✔
477
    }
478
  }
1✔
479

480
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
481
    volatile Throttle throttle;
482

483
    private ClientTransport getTransport(PickSubchannelArgs args) {
484
      SubchannelPicker pickerCopy = subchannelPicker;
1✔
485
      if (shutdown.get()) {
1✔
486
        // If channel is shut down, delayedTransport is also shut down which will fail the stream
487
        // properly.
488
        return delayedTransport;
1✔
489
      }
490
      if (pickerCopy == null) {
1✔
491
        final class ExitIdleModeForTransport implements Runnable {
1✔
492
          @Override
493
          public void run() {
494
            exitIdleMode();
1✔
495
          }
1✔
496
        }
497

498
        syncContext.execute(new ExitIdleModeForTransport());
1✔
499
        return delayedTransport;
1✔
500
      }
501
      // There is no need to reschedule the idle timer here.
502
      //
503
      // pickerCopy != null, which means idle timer has not expired when this method starts.
504
      // Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
505
      // which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
506
      // SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
507
      //
508
      // In most cases the idle timer is scheduled to fire after the transport has created the
509
      // stream, which would have reported in-use state to the channel that would have cancelled
510
      // the idle timer.
511
      PickResult pickResult = pickerCopy.pickSubchannel(args);
1✔
512
      ClientTransport transport = GrpcUtil.getTransportFromPickResult(
1✔
513
          pickResult, args.getCallOptions().isWaitForReady());
1✔
514
      if (transport != null) {
1✔
515
        return transport;
1✔
516
      }
517
      return delayedTransport;
1✔
518
    }
519

520
    @Override
521
    public ClientStream newStream(
522
        final MethodDescriptor<?, ?> method,
523
        final CallOptions callOptions,
524
        final Metadata headers,
525
        final Context context) {
526
      if (!retryEnabled) {
1✔
527
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
528
            callOptions, headers, 0, /* isTransparentRetry= */ false);
529
        ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
1✔
530
            method, headers, callOptions, new PickDetailsConsumerImpl(tracers)));
531
        Context origContext = context.attach();
1✔
532
        try {
533
          return transport.newStream(method, headers, callOptions, tracers);
1✔
534
        } finally {
535
          context.detach(origContext);
1✔
536
        }
537
      } else {
538
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
539
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
540
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
541
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
542
          @SuppressWarnings("unchecked")
543
          RetryStream() {
1✔
544
            super(
1✔
545
                (MethodDescriptor<ReqT, ?>) method,
546
                headers,
547
                channelBufferUsed,
1✔
548
                perRpcBufferLimit,
1✔
549
                channelBufferLimit,
1✔
550
                getCallExecutor(callOptions),
1✔
551
                transportFactory.getScheduledExecutorService(),
1✔
552
                retryPolicy,
553
                hedgingPolicy,
554
                throttle);
555
          }
1✔
556

557
          @Override
558
          Status prestart() {
559
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
560
          }
561

562
          @Override
563
          void postCommit() {
564
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
565
          }
1✔
566

567
          @Override
568
          ClientStream newSubstream(
569
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
570
              boolean isTransparentRetry) {
571
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
572
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
573
                newOptions, newHeaders, previousAttempts, isTransparentRetry);
574
            ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
1✔
575
                method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers)));
576
            Context origContext = context.attach();
1✔
577
            try {
578
              return transport.newStream(method, newHeaders, newOptions, tracers);
1✔
579
            } finally {
580
              context.detach(origContext);
1✔
581
            }
582
          }
583
        }
584

585
        return new RetryStream<>();
1✔
586
      }
587
    }
588
  }
589

590
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
591

592
  private final Rescheduler idleTimer;
593
  private final MetricRecorder metricRecorder;
594

595
  ManagedChannelImpl(
596
      ManagedChannelImplBuilder builder,
597
      ClientTransportFactory clientTransportFactory,
598
      BackoffPolicy.Provider backoffPolicyProvider,
599
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
600
      Supplier<Stopwatch> stopwatchSupplier,
601
      List<ClientInterceptor> interceptors,
602
      final TimeProvider timeProvider) {
1✔
603
    this.target = checkNotNull(builder.target, "target");
1✔
604
    this.logId = InternalLogId.allocate("Channel", target);
1✔
605
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
606
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
607
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
608
    this.originalChannelCreds = builder.channelCredentials;
1✔
609
    this.originalTransportFactory = clientTransportFactory;
1✔
610
    this.offloadExecutorHolder =
1✔
611
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
612
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
613
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
614
    this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
1✔
615
        clientTransportFactory, null, this.offloadExecutorHolder);
616
    this.scheduledExecutor =
1✔
617
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
618
    maxTraceEvents = builder.maxTraceEvents;
1✔
619
    channelTracer = new ChannelTracer(
1✔
620
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
621
        "Channel for '" + target + "'");
622
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
623
    ProxyDetector proxyDetector =
624
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
625
    this.retryEnabled = builder.retryEnabled;
1✔
626
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
627
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
628
    ResolvedNameResolver resolvedResolver = getNameResolverProvider(
1✔
629
        target, nameResolverRegistry, transportFactory.getSupportedSocketAddressTypes());
1✔
630
    this.targetUri = resolvedResolver.targetUri;
1✔
631
    this.nameResolverProvider = resolvedResolver.provider;
1✔
632
    ScParser serviceConfigParser =
1✔
633
        new ScParser(
634
            retryEnabled,
635
            builder.maxRetryAttempts,
636
            builder.maxHedgedAttempts,
637
            loadBalancerFactory);
638
    this.authorityOverride = builder.authorityOverride;
1✔
639
    this.nameResolverArgs =
1✔
640
        NameResolver.Args.newBuilder()
1✔
641
            .setDefaultPort(builder.getDefaultPort())
1✔
642
            .setProxyDetector(proxyDetector)
1✔
643
            .setSynchronizationContext(syncContext)
1✔
644
            .setScheduledExecutorService(scheduledExecutor)
1✔
645
            .setServiceConfigParser(serviceConfigParser)
1✔
646
            .setChannelLogger(channelLogger)
1✔
647
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
648
            .setOverrideAuthority(this.authorityOverride)
1✔
649
            .build();
1✔
650
    this.nameResolver = getNameResolver(
1✔
651
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
652
    this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
1✔
653
    this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
1✔
654
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
655
    this.delayedTransport.start(delayedTransportListener);
1✔
656
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
657

658
    if (builder.defaultServiceConfig != null) {
1✔
659
      ConfigOrError parsedDefaultServiceConfig =
1✔
660
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
661
      checkState(
1✔
662
          parsedDefaultServiceConfig.getError() == null,
1✔
663
          "Default config is invalid: %s",
664
          parsedDefaultServiceConfig.getError());
1✔
665
      this.defaultServiceConfig =
1✔
666
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
667
      this.lastServiceConfig = this.defaultServiceConfig;
1✔
668
    } else {
1✔
669
      this.defaultServiceConfig = null;
1✔
670
    }
671
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
672
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
673
    Channel channel = realChannel;
1✔
674
    if (builder.binlog != null) {
1✔
675
      channel = builder.binlog.wrapChannel(channel);
1✔
676
    }
677
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
678
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
679
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
680
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
681
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
682
    } else {
683
      checkArgument(
1✔
684
          builder.idleTimeoutMillis
685
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
686
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
687
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
688
    }
689

690
    idleTimer = new Rescheduler(
1✔
691
        new IdleModeTimer(),
692
        syncContext,
693
        transportFactory.getScheduledExecutorService(),
1✔
694
        stopwatchSupplier.get());
1✔
695
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
696
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
697
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
698
    this.userAgent = builder.userAgent;
1✔
699

700
    this.channelBufferLimit = builder.retryBufferSize;
1✔
701
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
702
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
703
      @Override
704
      public CallTracer create() {
705
        return new CallTracer(timeProvider);
1✔
706
      }
707
    }
708

709
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
710
    channelCallTracer = callTracerFactory.create();
1✔
711
    this.channelz = checkNotNull(builder.channelz);
1✔
712
    channelz.addRootChannel(this);
1✔
713

714
    if (!lookUpServiceConfig) {
1✔
715
      if (defaultServiceConfig != null) {
1✔
716
        channelLogger.log(
1✔
717
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
718
      }
719
      serviceConfigUpdated = true;
1✔
720
    }
721
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
722
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
723
  }
1✔
724

725
  @VisibleForTesting
726
  static class ResolvedNameResolver {
727
    public final URI targetUri;
728
    public final NameResolverProvider provider;
729

730
    public ResolvedNameResolver(URI targetUri, NameResolverProvider provider) {
1✔
731
      this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
732
      this.provider = checkNotNull(provider, "provider");
1✔
733
    }
1✔
734
  }
735

736
  @VisibleForTesting
737
  static ResolvedNameResolver getNameResolverProvider(
738
      String target, NameResolverRegistry nameResolverRegistry,
739
      Collection<Class<? extends SocketAddress>> channelTransportSocketAddressTypes) {
740
    // Finding a NameResolver. Try using the target string as the URI. If that fails, try prepending
741
    // "dns:///".
742
    NameResolverProvider provider = null;
1✔
743
    URI targetUri = null;
1✔
744
    StringBuilder uriSyntaxErrors = new StringBuilder();
1✔
745
    try {
746
      targetUri = new URI(target);
1✔
747
    } catch (URISyntaxException e) {
1✔
748
      // Can happen with ip addresses like "[::1]:1234" or 127.0.0.1:1234.
749
      uriSyntaxErrors.append(e.getMessage());
1✔
750
    }
1✔
751
    if (targetUri != null) {
1✔
752
      // For "localhost:8080" this would likely cause provider to be null, because "localhost" is
753
      // parsed as the scheme. Will hit the next case and try "dns:///localhost:8080".
754
      provider = nameResolverRegistry.getProviderForScheme(targetUri.getScheme());
1✔
755
    }
756

757
    if (provider == null && !URI_PATTERN.matcher(target).matches()) {
1✔
758
      // It doesn't look like a URI target. Maybe it's an authority string. Try with the default
759
      // scheme from the registry.
760
      try {
761
        targetUri = new URI(nameResolverRegistry.getDefaultScheme(), "", "/" + target, null);
1✔
762
      } catch (URISyntaxException e) {
×
763
        // Should not be possible.
764
        throw new IllegalArgumentException(e);
×
765
      }
1✔
766
      provider = nameResolverRegistry.getProviderForScheme(targetUri.getScheme());
1✔
767
    }
768

769
    if (provider == null) {
1✔
770
      throw new IllegalArgumentException(String.format(
1✔
771
          "Could not find a NameResolverProvider for %s%s",
772
          target, uriSyntaxErrors.length() > 0 ? " (" + uriSyntaxErrors + ")" : ""));
1✔
773
    }
774

775
    if (channelTransportSocketAddressTypes != null) {
1✔
776
      Collection<Class<? extends SocketAddress>> nameResolverSocketAddressTypes
1✔
777
          = provider.getProducedSocketAddressTypes();
1✔
778
      if (!channelTransportSocketAddressTypes.containsAll(nameResolverSocketAddressTypes)) {
1✔
779
        throw new IllegalArgumentException(String.format(
1✔
780
            "Address types of NameResolver '%s' for '%s' not supported by transport",
781
            targetUri.getScheme(), target));
1✔
782
      }
783
    }
784

785
    return new ResolvedNameResolver(targetUri, provider);
1✔
786
  }
787

788
  @VisibleForTesting
789
  static NameResolver getNameResolver(
790
      URI targetUri, @Nullable final String overrideAuthority,
791
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
792
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
793
    if (resolver == null) {
1✔
794
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
795
    }
796

797
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
798
    // TODO: After a transition period, all NameResolver implementations that need retry should use
799
    //       RetryingNameResolver directly and this step can be removed.
800
    NameResolver usedNameResolver = new RetryingNameResolver(resolver,
1✔
801
          new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
802
              nameResolverArgs.getScheduledExecutorService(),
1✔
803
              nameResolverArgs.getSynchronizationContext()),
1✔
804
          nameResolverArgs.getSynchronizationContext());
1✔
805

806
    if (overrideAuthority == null) {
1✔
807
      return usedNameResolver;
1✔
808
    }
809

810
    return new ForwardingNameResolver(usedNameResolver) {
1✔
811
      @Override
812
      public String getServiceAuthority() {
813
        return overrideAuthority;
1✔
814
      }
815
    };
816
  }
817

818
  @VisibleForTesting
819
  InternalConfigSelector getConfigSelector() {
820
    return realChannel.configSelector.get();
1✔
821
  }
822

823
  /**
824
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
825
   * cancelled.
826
   */
827
  @Override
828
  public ManagedChannelImpl shutdown() {
829
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
830
    if (!shutdown.compareAndSet(false, true)) {
1✔
831
      return this;
1✔
832
    }
833
    final class Shutdown implements Runnable {
1✔
834
      @Override
835
      public void run() {
836
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
837
        channelStateManager.gotoState(SHUTDOWN);
1✔
838
      }
1✔
839
    }
840

841
    syncContext.execute(new Shutdown());
1✔
842
    realChannel.shutdown();
1✔
843
    final class CancelIdleTimer implements Runnable {
1✔
844
      @Override
845
      public void run() {
846
        cancelIdleTimer(/* permanent= */ true);
1✔
847
      }
1✔
848
    }
849

850
    syncContext.execute(new CancelIdleTimer());
1✔
851
    return this;
1✔
852
  }
853

854
  /**
855
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
856
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
857
   * return {@code false} immediately after this method returns.
858
   */
859
  @Override
860
  public ManagedChannelImpl shutdownNow() {
861
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
862
    shutdown();
1✔
863
    realChannel.shutdownNow();
1✔
864
    final class ShutdownNow implements Runnable {
1✔
865
      @Override
866
      public void run() {
867
        if (shutdownNowed) {
1✔
868
          return;
1✔
869
        }
870
        shutdownNowed = true;
1✔
871
        maybeShutdownNowSubchannels();
1✔
872
      }
1✔
873
    }
874

875
    syncContext.execute(new ShutdownNow());
1✔
876
    return this;
1✔
877
  }
878

879
  // Called from syncContext
880
  @VisibleForTesting
881
  void panic(final Throwable t) {
882
    if (panicMode) {
1✔
883
      // Preserve the first panic information
884
      return;
×
885
    }
886
    panicMode = true;
1✔
887
    cancelIdleTimer(/* permanent= */ true);
1✔
888
    shutdownNameResolverAndLoadBalancer(false);
1✔
889
    final class PanicSubchannelPicker extends SubchannelPicker {
1✔
890
      private final PickResult panicPickResult =
1✔
891
          PickResult.withDrop(
1✔
892
              Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
1✔
893

894
      @Override
895
      public PickResult pickSubchannel(PickSubchannelArgs args) {
896
        return panicPickResult;
1✔
897
      }
898

899
      @Override
900
      public String toString() {
901
        return MoreObjects.toStringHelper(PanicSubchannelPicker.class)
×
902
            .add("panicPickResult", panicPickResult)
×
903
            .toString();
×
904
      }
905
    }
906

907
    updateSubchannelPicker(new PanicSubchannelPicker());
1✔
908
    realChannel.updateConfigSelector(null);
1✔
909
    channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
910
    channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
911
  }
1✔
912

913
  @VisibleForTesting
914
  boolean isInPanicMode() {
915
    return panicMode;
1✔
916
  }
917

918
  // Called from syncContext
919
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
920
    subchannelPicker = newPicker;
1✔
921
    delayedTransport.reprocess(newPicker);
1✔
922
  }
1✔
923

924
  @Override
925
  public boolean isShutdown() {
926
    return shutdown.get();
1✔
927
  }
928

929
  @Override
930
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
931
    return terminatedLatch.await(timeout, unit);
1✔
932
  }
933

934
  @Override
935
  public boolean isTerminated() {
936
    return terminated;
1✔
937
  }
938

939
  /*
940
   * Creates a new outgoing call on the channel.
941
   */
942
  @Override
943
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
944
      CallOptions callOptions) {
945
    return interceptorChannel.newCall(method, callOptions);
1✔
946
  }
947

948
  @Override
949
  public String authority() {
950
    return interceptorChannel.authority();
1✔
951
  }
952

953
  private Executor getCallExecutor(CallOptions callOptions) {
954
    Executor executor = callOptions.getExecutor();
1✔
955
    if (executor == null) {
1✔
956
      executor = this.executor;
1✔
957
    }
958
    return executor;
1✔
959
  }
960

961
  private class RealChannel extends Channel {
962
    // Reference to null if no config selector is available from resolution result
963
    // Reference must be set() from syncContext
964
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
965
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
966
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
967
    // same target, the new instance must have the same value.
968
    private final String authority;
969

970
    private final Channel clientCallImplChannel = new Channel() {
1✔
971
      @Override
972
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
973
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
974
        return new ClientCallImpl<>(
1✔
975
            method,
976
            getCallExecutor(callOptions),
1✔
977
            callOptions,
978
            transportProvider,
1✔
979
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
980
            channelCallTracer,
1✔
981
            null)
982
            .setFullStreamDecompression(fullStreamDecompression)
1✔
983
            .setDecompressorRegistry(decompressorRegistry)
1✔
984
            .setCompressorRegistry(compressorRegistry);
1✔
985
      }
986

987
      @Override
988
      public String authority() {
989
        return authority;
×
990
      }
991
    };
992

993
    private RealChannel(String authority) {
1✔
994
      this.authority =  checkNotNull(authority, "authority");
1✔
995
    }
1✔
996

997
    @Override
998
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
999
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1000
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
1001
        return newClientCall(method, callOptions);
1✔
1002
      }
1003
      syncContext.execute(new Runnable() {
1✔
1004
        @Override
1005
        public void run() {
1006
          exitIdleMode();
1✔
1007
        }
1✔
1008
      });
1009
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
1010
        // This is an optimization for the case (typically with InProcessTransport) when name
1011
        // resolution result is immediately available at this point. Otherwise, some users'
1012
        // tests might observe slight behavior difference from earlier grpc versions.
1013
        return newClientCall(method, callOptions);
1✔
1014
      }
1015
      if (shutdown.get()) {
1✔
1016
        // Return a failing ClientCall.
1017
        return new ClientCall<ReqT, RespT>() {
×
1018
          @Override
1019
          public void start(Listener<RespT> responseListener, Metadata headers) {
1020
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
1021
          }
×
1022

1023
          @Override public void request(int numMessages) {}
×
1024

1025
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
1026

1027
          @Override public void halfClose() {}
×
1028

1029
          @Override public void sendMessage(ReqT message) {}
×
1030
        };
1031
      }
1032
      Context context = Context.current();
1✔
1033
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
1034
      syncContext.execute(new Runnable() {
1✔
1035
        @Override
1036
        public void run() {
1037
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1038
            if (pendingCalls == null) {
1✔
1039
              pendingCalls = new LinkedHashSet<>();
1✔
1040
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
1041
            }
1042
            pendingCalls.add(pendingCall);
1✔
1043
          } else {
1044
            pendingCall.reprocess();
1✔
1045
          }
1046
        }
1✔
1047
      });
1048
      return pendingCall;
1✔
1049
    }
1050

1051
    // Must run in SynchronizationContext.
1052
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
1053
      InternalConfigSelector prevConfig = configSelector.get();
1✔
1054
      configSelector.set(config);
1✔
1055
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
1056
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1057
          pendingCall.reprocess();
1✔
1058
        }
1✔
1059
      }
1060
    }
1✔
1061

1062
    // Must run in SynchronizationContext.
1063
    void onConfigError() {
1064
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1065
        updateConfigSelector(null);
1✔
1066
      }
1067
    }
1✔
1068

1069
    void shutdown() {
1070
      final class RealChannelShutdown implements Runnable {
1✔
1071
        @Override
1072
        public void run() {
1073
          if (pendingCalls == null) {
1✔
1074
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1075
              configSelector.set(null);
1✔
1076
            }
1077
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1078
          }
1079
        }
1✔
1080
      }
1081

1082
      syncContext.execute(new RealChannelShutdown());
1✔
1083
    }
1✔
1084

1085
    void shutdownNow() {
1086
      final class RealChannelShutdownNow implements Runnable {
1✔
1087
        @Override
1088
        public void run() {
1089
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
1090
            configSelector.set(null);
1✔
1091
          }
1092
          if (pendingCalls != null) {
1✔
1093
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
1094
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
1095
            }
1✔
1096
          }
1097
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
1098
        }
1✔
1099
      }
1100

1101
      syncContext.execute(new RealChannelShutdownNow());
1✔
1102
    }
1✔
1103

1104
    @Override
1105
    public String authority() {
1106
      return authority;
1✔
1107
    }
1108

1109
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
1110
      final Context context;
1111
      final MethodDescriptor<ReqT, RespT> method;
1112
      final CallOptions callOptions;
1113
      private final long callCreationTime;
1114

1115
      PendingCall(
1116
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
1117
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
1118
        this.context = context;
1✔
1119
        this.method = method;
1✔
1120
        this.callOptions = callOptions;
1✔
1121
        this.callCreationTime = ticker.nanoTime();
1✔
1122
      }
1✔
1123

1124
      /** Called when it's ready to create a real call and reprocess the pending call. */
1125
      void reprocess() {
1126
        ClientCall<ReqT, RespT> realCall;
1127
        Context previous = context.attach();
1✔
1128
        try {
1129
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1130
              ticker.nanoTime() - callCreationTime);
1✔
1131
          realCall = newClientCall(method, delayResolutionOption);
1✔
1132
        } finally {
1133
          context.detach(previous);
1✔
1134
        }
1135
        Runnable toRun = setCall(realCall);
1✔
1136
        if (toRun == null) {
1✔
1137
          syncContext.execute(new PendingCallRemoval());
1✔
1138
        } else {
1139
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1140
            @Override
1141
            public void run() {
1142
              toRun.run();
1✔
1143
              syncContext.execute(new PendingCallRemoval());
1✔
1144
            }
1✔
1145
          });
1146
        }
1147
      }
1✔
1148

1149
      @Override
1150
      protected void callCancelled() {
1151
        super.callCancelled();
1✔
1152
        syncContext.execute(new PendingCallRemoval());
1✔
1153
      }
1✔
1154

1155
      final class PendingCallRemoval implements Runnable {
1✔
1156
        @Override
1157
        public void run() {
1158
          if (pendingCalls != null) {
1✔
1159
            pendingCalls.remove(PendingCall.this);
1✔
1160
            if (pendingCalls.isEmpty()) {
1✔
1161
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1162
              pendingCalls = null;
1✔
1163
              if (shutdown.get()) {
1✔
1164
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1165
              }
1166
            }
1167
          }
1168
        }
1✔
1169
      }
1170
    }
1171

1172
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1173
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1174
      InternalConfigSelector selector = configSelector.get();
1✔
1175
      if (selector == null) {
1✔
1176
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1177
      }
1178
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1179
        MethodInfo methodInfo =
1✔
1180
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1181
        if (methodInfo != null) {
1✔
1182
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1183
        }
1184
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1185
      }
1186
      return new ConfigSelectingClientCall<>(
1✔
1187
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1188
    }
1189
  }
1190

1191
  /**
1192
   * A client call for a given channel that applies a given config selector when it starts.
1193
   */
1194
  static final class ConfigSelectingClientCall<ReqT, RespT>
1195
      extends ForwardingClientCall<ReqT, RespT> {
1196

1197
    private final InternalConfigSelector configSelector;
1198
    private final Channel channel;
1199
    private final Executor callExecutor;
1200
    private final MethodDescriptor<ReqT, RespT> method;
1201
    private final Context context;
1202
    private CallOptions callOptions;
1203

1204
    private ClientCall<ReqT, RespT> delegate;
1205

1206
    ConfigSelectingClientCall(
1207
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1208
        MethodDescriptor<ReqT, RespT> method,
1209
        CallOptions callOptions) {
1✔
1210
      this.configSelector = configSelector;
1✔
1211
      this.channel = channel;
1✔
1212
      this.method = method;
1✔
1213
      this.callExecutor =
1✔
1214
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1215
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1216
      this.context = Context.current();
1✔
1217
    }
1✔
1218

1219
    @Override
1220
    protected ClientCall<ReqT, RespT> delegate() {
1221
      return delegate;
1✔
1222
    }
1223

1224
    @SuppressWarnings("unchecked")
1225
    @Override
1226
    public void start(Listener<RespT> observer, Metadata headers) {
1227
      PickSubchannelArgs args =
1✔
1228
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1229
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1230
      Status status = result.getStatus();
1✔
1231
      if (!status.isOk()) {
1✔
1232
        executeCloseObserverInContext(observer,
1✔
1233
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1234
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1235
        return;
1✔
1236
      }
1237
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1238
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1239
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1240
      if (methodInfo != null) {
1✔
1241
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1242
      }
1243
      if (interceptor != null) {
1✔
1244
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1245
      } else {
1246
        delegate = channel.newCall(method, callOptions);
×
1247
      }
1248
      delegate.start(observer, headers);
1✔
1249
    }
1✔
1250

1251
    private void executeCloseObserverInContext(
1252
        final Listener<RespT> observer, final Status status) {
1253
      class CloseInContext extends ContextRunnable {
1254
        CloseInContext() {
1✔
1255
          super(context);
1✔
1256
        }
1✔
1257

1258
        @Override
1259
        public void runInContext() {
1260
          observer.onClose(status, new Metadata());
1✔
1261
        }
1✔
1262
      }
1263

1264
      callExecutor.execute(new CloseInContext());
1✔
1265
    }
1✔
1266

1267
    @Override
1268
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1269
      if (delegate != null) {
×
1270
        delegate.cancel(message, cause);
×
1271
      }
1272
    }
×
1273
  }
1274

1275
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1276
    @Override
1277
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1278

1279
    @Override
1280
    public void request(int numMessages) {}
1✔
1281

1282
    @Override
1283
    public void cancel(String message, Throwable cause) {}
×
1284

1285
    @Override
1286
    public void halfClose() {}
×
1287

1288
    @Override
1289
    public void sendMessage(Object message) {}
×
1290

1291
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1292
    @Override
1293
    public boolean isReady() {
1294
      return false;
×
1295
    }
1296
  };
1297

1298
  /**
1299
   * Terminate the channel if termination conditions are met.
1300
   */
1301
  // Must be run from syncContext
1302
  private void maybeTerminateChannel() {
1303
    if (terminated) {
1✔
1304
      return;
×
1305
    }
1306
    if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1✔
1307
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1308
      channelz.removeRootChannel(this);
1✔
1309
      executorPool.returnObject(executor);
1✔
1310
      balancerRpcExecutorHolder.release();
1✔
1311
      offloadExecutorHolder.release();
1✔
1312
      // Release the transport factory so that it can deallocate any resources.
1313
      transportFactory.close();
1✔
1314

1315
      terminated = true;
1✔
1316
      terminatedLatch.countDown();
1✔
1317
    }
1318
  }
1✔
1319

1320
  // Must be called from syncContext
1321
  private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1322
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
1323
      refreshNameResolution();
1✔
1324
    }
1325
  }
1✔
1326

1327
  @Override
1328
  @SuppressWarnings("deprecation")
1329
  public ConnectivityState getState(boolean requestConnection) {
1330
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1331
    if (requestConnection && savedChannelState == IDLE) {
1✔
1332
      final class RequestConnection implements Runnable {
1✔
1333
        @Override
1334
        public void run() {
1335
          exitIdleMode();
1✔
1336
          if (subchannelPicker != null) {
1✔
1337
            subchannelPicker.requestConnection();
1✔
1338
          }
1339
          if (lbHelper != null) {
1✔
1340
            lbHelper.lb.requestConnection();
1✔
1341
          }
1342
        }
1✔
1343
      }
1344

1345
      syncContext.execute(new RequestConnection());
1✔
1346
    }
1347
    return savedChannelState;
1✔
1348
  }
1349

1350
  @Override
1351
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1352
    final class NotifyStateChanged implements Runnable {
1✔
1353
      @Override
1354
      public void run() {
1355
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1356
      }
1✔
1357
    }
1358

1359
    syncContext.execute(new NotifyStateChanged());
1✔
1360
  }
1✔
1361

1362
  @Override
1363
  public void resetConnectBackoff() {
1364
    final class ResetConnectBackoff implements Runnable {
1✔
1365
      @Override
1366
      public void run() {
1367
        if (shutdown.get()) {
1✔
1368
          return;
1✔
1369
        }
1370
        if (nameResolverStarted) {
1✔
1371
          refreshNameResolution();
1✔
1372
        }
1373
        for (InternalSubchannel subchannel : subchannels) {
1✔
1374
          subchannel.resetConnectBackoff();
1✔
1375
        }
1✔
1376
        for (OobChannel oobChannel : oobChannels) {
1✔
1377
          oobChannel.resetConnectBackoff();
×
1378
        }
×
1379
      }
1✔
1380
    }
1381

1382
    syncContext.execute(new ResetConnectBackoff());
1✔
1383
  }
1✔
1384

1385
  @Override
1386
  public void enterIdle() {
1387
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1388
      @Override
1389
      public void run() {
1390
        if (shutdown.get() || lbHelper == null) {
1✔
1391
          return;
1✔
1392
        }
1393
        cancelIdleTimer(/* permanent= */ false);
1✔
1394
        enterIdleMode();
1✔
1395
      }
1✔
1396
    }
1397

1398
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1399
  }
1✔
1400

1401
  /**
1402
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1403
   * backoff.
1404
   */
1405
  private final class UncommittedRetriableStreamsRegistry {
1✔
1406
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1407
    // it's worthwhile to look for a lock-free approach.
1408
    final Object lock = new Object();
1✔
1409

1410
    @GuardedBy("lock")
1✔
1411
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1412

1413
    @GuardedBy("lock")
1414
    Status shutdownStatus;
1415

1416
    void onShutdown(Status reason) {
1417
      boolean shouldShutdownDelayedTransport = false;
1✔
1418
      synchronized (lock) {
1✔
1419
        if (shutdownStatus != null) {
1✔
1420
          return;
1✔
1421
        }
1422
        shutdownStatus = reason;
1✔
1423
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1424
        // retriable streams, which may be in backoff and not using any transport, are already
1425
        // started RPCs.
1426
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1427
          shouldShutdownDelayedTransport = true;
1✔
1428
        }
1429
      }
1✔
1430

1431
      if (shouldShutdownDelayedTransport) {
1✔
1432
        delayedTransport.shutdown(reason);
1✔
1433
      }
1434
    }
1✔
1435

1436
    void onShutdownNow(Status reason) {
1437
      onShutdown(reason);
1✔
1438
      Collection<ClientStream> streams;
1439

1440
      synchronized (lock) {
1✔
1441
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1442
      }
1✔
1443

1444
      for (ClientStream stream : streams) {
1✔
1445
        stream.cancel(reason);
1✔
1446
      }
1✔
1447
      delayedTransport.shutdownNow(reason);
1✔
1448
    }
1✔
1449

1450
    /**
1451
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1452
     * shutdown Status.
1453
     */
1454
    @Nullable
1455
    Status add(RetriableStream<?> retriableStream) {
1456
      synchronized (lock) {
1✔
1457
        if (shutdownStatus != null) {
1✔
1458
          return shutdownStatus;
1✔
1459
        }
1460
        uncommittedRetriableStreams.add(retriableStream);
1✔
1461
        return null;
1✔
1462
      }
1463
    }
1464

1465
    void remove(RetriableStream<?> retriableStream) {
1466
      Status shutdownStatusCopy = null;
1✔
1467

1468
      synchronized (lock) {
1✔
1469
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1470
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1471
          shutdownStatusCopy = shutdownStatus;
1✔
1472
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1473
          // hashmap.
1474
          uncommittedRetriableStreams = new HashSet<>();
1✔
1475
        }
1476
      }
1✔
1477

1478
      if (shutdownStatusCopy != null) {
1✔
1479
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1480
      }
1481
    }
1✔
1482
  }
1483

1484
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1485
    AutoConfiguredLoadBalancer lb;
1486

1487
    @Override
1488
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1489
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1490
      // No new subchannel should be created after load balancer has been shutdown.
1491
      checkState(!terminating, "Channel is being terminated");
1✔
1492
      return new SubchannelImpl(args);
1✔
1493
    }
1494

1495
    @Override
1496
    public void updateBalancingState(
1497
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1498
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1499
      checkNotNull(newState, "newState");
1✔
1500
      checkNotNull(newPicker, "newPicker");
1✔
1501
      final class UpdateBalancingState implements Runnable {
1✔
1502
        @Override
1503
        public void run() {
1504
          if (LbHelperImpl.this != lbHelper) {
1✔
1505
            return;
1✔
1506
          }
1507
          updateSubchannelPicker(newPicker);
1✔
1508
          // It's not appropriate to report SHUTDOWN state from lb.
1509
          // Ignore the case of newState == SHUTDOWN for now.
1510
          if (newState != SHUTDOWN) {
1✔
1511
            channelLogger.log(
1✔
1512
                ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1513
            channelStateManager.gotoState(newState);
1✔
1514
          }
1515
        }
1✔
1516
      }
1517

1518
      syncContext.execute(new UpdateBalancingState());
1✔
1519
    }
1✔
1520

1521
    @Override
1522
    public void refreshNameResolution() {
1523
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1524
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1525
        @Override
1526
        public void run() {
1527
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1528
        }
1✔
1529
      }
1530

1531
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1532
    }
1✔
1533

1534
    @Override
1535
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1536
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1537
    }
1538

1539
    @Override
1540
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1541
        String authority) {
1542
      // TODO(ejona): can we be even stricter? Like terminating?
1543
      checkState(!terminated, "Channel is terminated");
1✔
1544
      long oobChannelCreationTime = timeProvider.currentTimeNanos();
1✔
1545
      InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1✔
1546
      InternalLogId subchannelLogId =
1✔
1547
          InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1✔
1548
      ChannelTracer oobChannelTracer =
1✔
1549
          new ChannelTracer(
1550
              oobLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1551
              "OobChannel for " + addressGroup);
1552
      final OobChannel oobChannel = new OobChannel(
1✔
1553
          authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1✔
1554
          syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1✔
1555
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1556
          .setDescription("Child OobChannel created")
1✔
1557
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1558
          .setTimestampNanos(oobChannelCreationTime)
1✔
1559
          .setChannelRef(oobChannel)
1✔
1560
          .build());
1✔
1561
      ChannelTracer subchannelTracer =
1✔
1562
          new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1✔
1563
              "Subchannel for " + addressGroup);
1564
      ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1565
      final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1✔
1566
        @Override
1567
        void onTerminated(InternalSubchannel is) {
1568
          oobChannels.remove(oobChannel);
1✔
1569
          channelz.removeSubchannel(is);
1✔
1570
          oobChannel.handleSubchannelTerminated();
1✔
1571
          maybeTerminateChannel();
1✔
1572
        }
1✔
1573

1574
        @Override
1575
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1576
          // TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1577
          //  state and refresh name resolution if necessary.
1578
          handleInternalSubchannelState(newState);
1✔
1579
          oobChannel.handleSubchannelStateChange(newState);
1✔
1580
        }
1✔
1581
      }
1582

1583
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1584
          addressGroup,
1585
          authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1✔
1586
          oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1✔
1587
          // All callback methods are run from syncContext
1588
          new ManagedOobChannelCallback(),
1589
          channelz,
1✔
1590
          callTracerFactory.create(),
1✔
1591
          subchannelTracer,
1592
          subchannelLogId,
1593
          subchannelLogger,
1594
          transportFilters);
1✔
1595
      oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1596
          .setDescription("Child Subchannel created")
1✔
1597
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1598
          .setTimestampNanos(oobChannelCreationTime)
1✔
1599
          .setSubchannelRef(internalSubchannel)
1✔
1600
          .build());
1✔
1601
      channelz.addSubchannel(oobChannel);
1✔
1602
      channelz.addSubchannel(internalSubchannel);
1✔
1603
      oobChannel.setSubchannel(internalSubchannel);
1✔
1604
      final class AddOobChannel implements Runnable {
1✔
1605
        @Override
1606
        public void run() {
1607
          if (terminating) {
1✔
1608
            oobChannel.shutdown();
×
1609
          }
1610
          if (!terminated) {
1✔
1611
            // If channel has not terminated, it will track the subchannel and block termination
1612
            // for it.
1613
            oobChannels.add(oobChannel);
1✔
1614
          }
1615
        }
1✔
1616
      }
1617

1618
      syncContext.execute(new AddOobChannel());
1✔
1619
      return oobChannel;
1✔
1620
    }
1621

1622
    @Deprecated
1623
    @Override
1624
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1625
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1626
          // Override authority to keep the old behavior.
1627
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1628
          .overrideAuthority(getAuthority());
1✔
1629
    }
1630

1631
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1632
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1633
    @Override
1634
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1635
        final String target, final ChannelCredentials channelCreds) {
1636
      checkNotNull(channelCreds, "channelCreds");
1✔
1637

1638
      final class ResolvingOobChannelBuilder
1639
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1640
        final ManagedChannelBuilder<?> delegate;
1641

1642
        ResolvingOobChannelBuilder() {
1✔
1643
          final ClientTransportFactory transportFactory;
1644
          CallCredentials callCredentials;
1645
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1646
            transportFactory = originalTransportFactory;
1✔
1647
            callCredentials = null;
1✔
1648
          } else {
1649
            SwapChannelCredentialsResult swapResult =
1✔
1650
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1651
            if (swapResult == null) {
1✔
1652
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1653
              return;
×
1654
            } else {
1655
              transportFactory = swapResult.transportFactory;
1✔
1656
              callCredentials = swapResult.callCredentials;
1✔
1657
            }
1658
          }
1659
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1660
              new ClientTransportFactoryBuilder() {
1✔
1661
                @Override
1662
                public ClientTransportFactory buildClientTransportFactory() {
1663
                  return transportFactory;
1✔
1664
                }
1665
              };
1666
          delegate = new ManagedChannelImplBuilder(
1✔
1667
              target,
1668
              channelCreds,
1669
              callCredentials,
1670
              transportFactoryBuilder,
1671
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1672
              .nameResolverRegistry(nameResolverRegistry);
1✔
1673
        }
1✔
1674

1675
        @Override
1676
        protected ManagedChannelBuilder<?> delegate() {
1677
          return delegate;
1✔
1678
        }
1679
      }
1680

1681
      checkState(!terminated, "Channel is terminated");
1✔
1682

1683
      @SuppressWarnings("deprecation")
1684
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1685

1686
      return builder
1✔
1687
          // TODO(zdapeng): executors should not outlive the parent channel.
1688
          .executor(executor)
1✔
1689
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1690
          .maxTraceEvents(maxTraceEvents)
1✔
1691
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1692
          .userAgent(userAgent);
1✔
1693
    }
1694

1695
    @Override
1696
    public ChannelCredentials getUnsafeChannelCredentials() {
1697
      if (originalChannelCreds == null) {
×
1698
        return new DefaultChannelCreds();
×
1699
      }
1700
      return originalChannelCreds;
×
1701
    }
1702

1703
    @Override
1704
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1705
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1706
    }
×
1707

1708
    @Override
1709
    public void updateOobChannelAddresses(ManagedChannel channel,
1710
        List<EquivalentAddressGroup> eag) {
1711
      checkArgument(channel instanceof OobChannel,
1✔
1712
          "channel must have been returned from createOobChannel");
1713
      ((OobChannel) channel).updateAddresses(eag);
1✔
1714
    }
1✔
1715

1716
    @Override
1717
    public String getAuthority() {
1718
      return ManagedChannelImpl.this.authority();
1✔
1719
    }
1720

1721
    @Override
1722
    public String getChannelTarget() {
1723
      return targetUri.toString();
×
1724
    }
1725

1726
    @Override
1727
    public SynchronizationContext getSynchronizationContext() {
1728
      return syncContext;
1✔
1729
    }
1730

1731
    @Override
1732
    public ScheduledExecutorService getScheduledExecutorService() {
1733
      return scheduledExecutor;
1✔
1734
    }
1735

1736
    @Override
1737
    public ChannelLogger getChannelLogger() {
1738
      return channelLogger;
1✔
1739
    }
1740

1741
    @Override
1742
    public NameResolver.Args getNameResolverArgs() {
1743
      return nameResolverArgs;
1✔
1744
    }
1745

1746
    @Override
1747
    public NameResolverRegistry getNameResolverRegistry() {
1748
      return nameResolverRegistry;
1✔
1749
    }
1750

1751
    @Override
1752
    public MetricRecorder getMetricRecorder() {
1753
      return metricRecorder;
1✔
1754
    }
1755

1756
    /**
1757
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1758
     */
1759
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1760
    //     channel creds.
1761
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1762
      @Override
1763
      public ChannelCredentials withoutBearerTokens() {
1764
        return this;
×
1765
      }
1766
    }
1767
  }
1768

1769
  final class NameResolverListener extends NameResolver.Listener2 {
1770
    final LbHelperImpl helper;
1771
    final NameResolver resolver;
1772

1773
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1774
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1775
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1776
    }
1✔
1777

1778
    @Override
1779
    public void onResult(final ResolutionResult resolutionResult) {
1780
      final class NamesResolved implements Runnable {
1✔
1781

1782
        @SuppressWarnings("ReferenceEquality")
1783
        @Override
1784
        public void run() {
1785
          if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1786
            return;
1✔
1787
          }
1788

1789
          List<EquivalentAddressGroup> servers = resolutionResult.getAddresses();
1✔
1790
          channelLogger.log(
1✔
1791
              ChannelLogLevel.DEBUG,
1792
              "Resolved address: {0}, config={1}",
1793
              servers,
1794
              resolutionResult.getAttributes());
1✔
1795

1796
          if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1797
            channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
1✔
1798
            lastResolutionState = ResolutionState.SUCCESS;
1✔
1799
          }
1800

1801
          ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1802
          ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
1✔
1803
              .get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
1✔
1804
          InternalConfigSelector resolvedConfigSelector =
1✔
1805
              resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1806
          ManagedChannelServiceConfig validServiceConfig =
1807
              configOrError != null && configOrError.getConfig() != null
1✔
1808
                  ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1809
                  : null;
1✔
1810
          Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1811

1812
          ManagedChannelServiceConfig effectiveServiceConfig;
1813
          if (!lookUpServiceConfig) {
1✔
1814
            if (validServiceConfig != null) {
1✔
1815
              channelLogger.log(
1✔
1816
                  ChannelLogLevel.INFO,
1817
                  "Service config from name resolver discarded by channel settings");
1818
            }
1819
            effectiveServiceConfig =
1820
                defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1821
            if (resolvedConfigSelector != null) {
1✔
1822
              channelLogger.log(
1✔
1823
                  ChannelLogLevel.INFO,
1824
                  "Config selector from name resolver discarded by channel settings");
1825
            }
1826
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1827
          } else {
1828
            // Try to use config if returned from name resolver
1829
            // Otherwise, try to use the default config if available
1830
            if (validServiceConfig != null) {
1✔
1831
              effectiveServiceConfig = validServiceConfig;
1✔
1832
              if (resolvedConfigSelector != null) {
1✔
1833
                realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1834
                if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1835
                  channelLogger.log(
×
1836
                      ChannelLogLevel.DEBUG,
1837
                      "Method configs in service config will be discarded due to presence of"
1838
                          + "config-selector");
1839
                }
1840
              } else {
1841
                realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1842
              }
1843
            } else if (defaultServiceConfig != null) {
1✔
1844
              effectiveServiceConfig = defaultServiceConfig;
1✔
1845
              realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1846
              channelLogger.log(
1✔
1847
                  ChannelLogLevel.INFO,
1848
                  "Received no service config, using default service config");
1849
            } else if (serviceConfigError != null) {
1✔
1850
              if (!serviceConfigUpdated) {
1✔
1851
                // First DNS lookup has invalid service config, and cannot fall back to default
1852
                channelLogger.log(
1✔
1853
                    ChannelLogLevel.INFO,
1854
                    "Fallback to error due to invalid first service config without default config");
1855
                // This error could be an "inappropriate" control plane error that should not bleed
1856
                // through to client code using gRPC. We let them flow through here to the LB as
1857
                // we later check for these error codes when investigating pick results in
1858
                // GrpcUtil.getTransportFromPickResult().
1859
                onError(configOrError.getError());
1✔
1860
                if (resolutionResultListener != null) {
1✔
1861
                  resolutionResultListener.resolutionAttempted(configOrError.getError());
1✔
1862
                }
1863
                return;
1✔
1864
              } else {
1865
                effectiveServiceConfig = lastServiceConfig;
1✔
1866
              }
1867
            } else {
1868
              effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1869
              realChannel.updateConfigSelector(null);
1✔
1870
            }
1871
            if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1872
              channelLogger.log(
1✔
1873
                  ChannelLogLevel.INFO,
1874
                  "Service config changed{0}",
1875
                  effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1876
              lastServiceConfig = effectiveServiceConfig;
1✔
1877
              transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1878
            }
1879

1880
            try {
1881
              // TODO(creamsoup): when `servers` is empty and lastResolutionStateCopy == SUCCESS
1882
              //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1883
              //  lbNeedAddress is not deterministic
1884
              serviceConfigUpdated = true;
1✔
1885
            } catch (RuntimeException re) {
×
1886
              logger.log(
×
1887
                  Level.WARNING,
1888
                  "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1889
                  re);
1890
            }
1✔
1891
          }
1892

1893
          Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1894
          // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1895
          if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1896
            Attributes.Builder attrBuilder =
1✔
1897
                effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1898
            Map<String, ?> healthCheckingConfig =
1✔
1899
                effectiveServiceConfig.getHealthCheckingConfig();
1✔
1900
            if (healthCheckingConfig != null) {
1✔
1901
              attrBuilder
1✔
1902
                  .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1903
                  .build();
1✔
1904
            }
1905
            Attributes attributes = attrBuilder.build();
1✔
1906

1907
            Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1908
                ResolvedAddresses.newBuilder()
1✔
1909
                    .setAddresses(servers)
1✔
1910
                    .setAttributes(attributes)
1✔
1911
                    .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
1✔
1912
                    .build());
1✔
1913
            // If a listener is provided, let it know if the addresses were accepted.
1914
            if (resolutionResultListener != null) {
1✔
1915
              resolutionResultListener.resolutionAttempted(addressAcceptanceStatus);
1✔
1916
            }
1917
          }
1918
        }
1✔
1919
      }
1920

1921
      syncContext.execute(new NamesResolved());
1✔
1922
    }
1✔
1923

1924
    @Override
1925
    public void onError(final Status error) {
1926
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1927
      final class NameResolverErrorHandler implements Runnable {
1✔
1928
        @Override
1929
        public void run() {
1930
          handleErrorInSyncContext(error);
1✔
1931
        }
1✔
1932
      }
1933

1934
      syncContext.execute(new NameResolverErrorHandler());
1✔
1935
    }
1✔
1936

1937
    private void handleErrorInSyncContext(Status error) {
1938
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1939
          new Object[] {getLogId(), error});
1✔
1940
      realChannel.onConfigError();
1✔
1941
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1942
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1943
        lastResolutionState = ResolutionState.ERROR;
1✔
1944
      }
1945
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1946
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1947
        return;
1✔
1948
      }
1949

1950
      helper.lb.handleNameResolutionError(error);
1✔
1951
    }
1✔
1952
  }
1953

1954
  private final class SubchannelImpl extends AbstractSubchannel {
1955
    final CreateSubchannelArgs args;
1956
    final InternalLogId subchannelLogId;
1957
    final ChannelLoggerImpl subchannelLogger;
1958
    final ChannelTracer subchannelTracer;
1959
    List<EquivalentAddressGroup> addressGroups;
1960
    InternalSubchannel subchannel;
1961
    boolean started;
1962
    boolean shutdown;
1963
    ScheduledHandle delayedShutdownTask;
1964

1965
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1966
      checkNotNull(args, "args");
1✔
1967
      addressGroups = args.getAddresses();
1✔
1968
      if (authorityOverride != null) {
1✔
1969
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1970
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1971
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1972
      }
1973
      this.args = args;
1✔
1974
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1975
      subchannelTracer = new ChannelTracer(
1✔
1976
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1977
          "Subchannel for " + args.getAddresses());
1✔
1978
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1979
    }
1✔
1980

1981
    @Override
1982
    public void start(final SubchannelStateListener listener) {
1983
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1984
      checkState(!started, "already started");
1✔
1985
      checkState(!shutdown, "already shutdown");
1✔
1986
      checkState(!terminating, "Channel is being terminated");
1✔
1987
      started = true;
1✔
1988
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1989
        // All callbacks are run in syncContext
1990
        @Override
1991
        void onTerminated(InternalSubchannel is) {
1992
          subchannels.remove(is);
1✔
1993
          channelz.removeSubchannel(is);
1✔
1994
          maybeTerminateChannel();
1✔
1995
        }
1✔
1996

1997
        @Override
1998
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1999
          checkState(listener != null, "listener is null");
1✔
2000
          listener.onSubchannelState(newState);
1✔
2001
        }
1✔
2002

2003
        @Override
2004
        void onInUse(InternalSubchannel is) {
2005
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
2006
        }
1✔
2007

2008
        @Override
2009
        void onNotInUse(InternalSubchannel is) {
2010
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
2011
        }
1✔
2012
      }
2013

2014
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
2015
          args.getAddresses(),
1✔
2016
          authority(),
1✔
2017
          userAgent,
1✔
2018
          backoffPolicyProvider,
1✔
2019
          transportFactory,
1✔
2020
          transportFactory.getScheduledExecutorService(),
1✔
2021
          stopwatchSupplier,
1✔
2022
          syncContext,
2023
          new ManagedInternalSubchannelCallback(),
2024
          channelz,
1✔
2025
          callTracerFactory.create(),
1✔
2026
          subchannelTracer,
2027
          subchannelLogId,
2028
          subchannelLogger,
2029
          transportFilters);
1✔
2030

2031
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
2032
          .setDescription("Child Subchannel started")
1✔
2033
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
2034
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
2035
          .setSubchannelRef(internalSubchannel)
1✔
2036
          .build());
1✔
2037

2038
      this.subchannel = internalSubchannel;
1✔
2039
      channelz.addSubchannel(internalSubchannel);
1✔
2040
      subchannels.add(internalSubchannel);
1✔
2041
    }
1✔
2042

2043
    @Override
2044
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
2045
      checkState(started, "not started");
1✔
2046
      return subchannel;
1✔
2047
    }
2048

2049
    @Override
2050
    public void shutdown() {
2051
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2052
      if (subchannel == null) {
1✔
2053
        // start() was not successful
2054
        shutdown = true;
×
2055
        return;
×
2056
      }
2057
      if (shutdown) {
1✔
2058
        if (terminating && delayedShutdownTask != null) {
1✔
2059
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
2060
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
2061
          delayedShutdownTask.cancel();
×
2062
          delayedShutdownTask = null;
×
2063
          // Will fall through to the subchannel.shutdown() at the end.
2064
        } else {
2065
          return;
1✔
2066
        }
2067
      } else {
2068
        shutdown = true;
1✔
2069
      }
2070
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
2071
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
2072
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
2073
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
2074
      // shutdown of Subchannel for a few seconds here.
2075
      //
2076
      // TODO(zhangkun83): consider a better approach
2077
      // (https://github.com/grpc/grpc-java/issues/2562).
2078
      if (!terminating) {
1✔
2079
        final class ShutdownSubchannel implements Runnable {
1✔
2080
          @Override
2081
          public void run() {
2082
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
2083
          }
1✔
2084
        }
2085

2086
        delayedShutdownTask = syncContext.schedule(
1✔
2087
            new LogExceptionRunnable(new ShutdownSubchannel()),
2088
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
2089
            transportFactory.getScheduledExecutorService());
1✔
2090
        return;
1✔
2091
      }
2092
      // When terminating == true, no more real streams will be created. It's safe and also
2093
      // desirable to shutdown timely.
2094
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
2095
    }
1✔
2096

2097
    @Override
2098
    public void requestConnection() {
2099
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2100
      checkState(started, "not started");
1✔
2101
      subchannel.obtainActiveTransport();
1✔
2102
    }
1✔
2103

2104
    @Override
2105
    public List<EquivalentAddressGroup> getAllAddresses() {
2106
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2107
      checkState(started, "not started");
1✔
2108
      return addressGroups;
1✔
2109
    }
2110

2111
    @Override
2112
    public Attributes getAttributes() {
2113
      return args.getAttributes();
1✔
2114
    }
2115

2116
    @Override
2117
    public String toString() {
2118
      return subchannelLogId.toString();
1✔
2119
    }
2120

2121
    @Override
2122
    public Channel asChannel() {
2123
      checkState(started, "not started");
1✔
2124
      return new SubchannelChannel(
1✔
2125
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
2126
          transportFactory.getScheduledExecutorService(),
1✔
2127
          callTracerFactory.create(),
1✔
2128
          new AtomicReference<InternalConfigSelector>(null));
2129
    }
2130

2131
    @Override
2132
    public Object getInternalSubchannel() {
2133
      checkState(started, "Subchannel is not started");
1✔
2134
      return subchannel;
1✔
2135
    }
2136

2137
    @Override
2138
    public ChannelLogger getChannelLogger() {
2139
      return subchannelLogger;
1✔
2140
    }
2141

2142
    @Override
2143
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
2144
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
2145
      addressGroups = addrs;
1✔
2146
      if (authorityOverride != null) {
1✔
2147
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
2148
      }
2149
      subchannel.updateAddresses(addrs);
1✔
2150
    }
1✔
2151

2152
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
2153
        List<EquivalentAddressGroup> eags) {
2154
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
2155
      for (EquivalentAddressGroup eag : eags) {
1✔
2156
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
2157
            eag.getAddresses(),
1✔
2158
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
2159
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
2160
      }
1✔
2161
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
2162
    }
2163
  }
2164

2165
  @Override
2166
  public String toString() {
2167
    return MoreObjects.toStringHelper(this)
1✔
2168
        .add("logId", logId.getId())
1✔
2169
        .add("target", target)
1✔
2170
        .toString();
1✔
2171
  }
2172

2173
  /**
2174
   * Called from syncContext.
2175
   */
2176
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
2177
    @Override
2178
    public void transportShutdown(Status s) {
2179
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2180
    }
1✔
2181

2182
    @Override
2183
    public void transportReady() {
2184
      // Don't care
2185
    }
×
2186

2187
    @Override
2188
    public Attributes filterTransport(Attributes attributes) {
2189
      return attributes;
×
2190
    }
2191

2192
    @Override
2193
    public void transportInUse(final boolean inUse) {
2194
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2195
    }
1✔
2196

2197
    @Override
2198
    public void transportTerminated() {
2199
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2200
      terminating = true;
1✔
2201
      shutdownNameResolverAndLoadBalancer(false);
1✔
2202
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2203
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2204
      // here.
2205
      maybeShutdownNowSubchannels();
1✔
2206
      maybeTerminateChannel();
1✔
2207
    }
1✔
2208
  }
2209

2210
  /**
2211
   * Must be accessed from syncContext.
2212
   */
2213
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2214
    @Override
2215
    protected void handleInUse() {
2216
      exitIdleMode();
1✔
2217
    }
1✔
2218

2219
    @Override
2220
    protected void handleNotInUse() {
2221
      if (shutdown.get()) {
1✔
2222
        return;
1✔
2223
      }
2224
      rescheduleIdleTimer();
1✔
2225
    }
1✔
2226
  }
2227

2228
  /**
2229
   * Lazily request for Executor from an executor pool.
2230
   * Also act as an Executor directly to simply run a cmd
2231
   */
2232
  @VisibleForTesting
2233
  static final class ExecutorHolder implements Executor {
2234
    private final ObjectPool<? extends Executor> pool;
2235
    private Executor executor;
2236

2237
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2238
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2239
    }
1✔
2240

2241
    synchronized Executor getExecutor() {
2242
      if (executor == null) {
1✔
2243
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2244
      }
2245
      return executor;
1✔
2246
    }
2247

2248
    synchronized void release() {
2249
      if (executor != null) {
1✔
2250
        executor = pool.returnObject(executor);
1✔
2251
      }
2252
    }
1✔
2253

2254
    @Override
2255
    public void execute(Runnable command) {
2256
      getExecutor().execute(command);
1✔
2257
    }
1✔
2258
  }
2259

2260
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2261
    final ScheduledExecutorService delegate;
2262

2263
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2264
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2265
    }
1✔
2266

2267
    @Override
2268
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2269
      return delegate.schedule(callable, delay, unit);
×
2270
    }
2271

2272
    @Override
2273
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2274
      return delegate.schedule(cmd, delay, unit);
1✔
2275
    }
2276

2277
    @Override
2278
    public ScheduledFuture<?> scheduleAtFixedRate(
2279
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2280
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
×
2281
    }
2282

2283
    @Override
2284
    public ScheduledFuture<?> scheduleWithFixedDelay(
2285
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2286
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2287
    }
2288

2289
    @Override
2290
    public boolean awaitTermination(long timeout, TimeUnit unit)
2291
        throws InterruptedException {
2292
      return delegate.awaitTermination(timeout, unit);
×
2293
    }
2294

2295
    @Override
2296
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2297
        throws InterruptedException {
2298
      return delegate.invokeAll(tasks);
×
2299
    }
2300

2301
    @Override
2302
    public <T> List<Future<T>> invokeAll(
2303
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2304
        throws InterruptedException {
2305
      return delegate.invokeAll(tasks, timeout, unit);
×
2306
    }
2307

2308
    @Override
2309
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2310
        throws InterruptedException, ExecutionException {
2311
      return delegate.invokeAny(tasks);
×
2312
    }
2313

2314
    @Override
2315
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2316
        throws InterruptedException, ExecutionException, TimeoutException {
2317
      return delegate.invokeAny(tasks, timeout, unit);
×
2318
    }
2319

2320
    @Override
2321
    public boolean isShutdown() {
2322
      return delegate.isShutdown();
×
2323
    }
2324

2325
    @Override
2326
    public boolean isTerminated() {
2327
      return delegate.isTerminated();
×
2328
    }
2329

2330
    @Override
2331
    public void shutdown() {
2332
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2333
    }
2334

2335
    @Override
2336
    public List<Runnable> shutdownNow() {
2337
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2338
    }
2339

2340
    @Override
2341
    public <T> Future<T> submit(Callable<T> task) {
2342
      return delegate.submit(task);
×
2343
    }
2344

2345
    @Override
2346
    public Future<?> submit(Runnable task) {
2347
      return delegate.submit(task);
×
2348
    }
2349

2350
    @Override
2351
    public <T> Future<T> submit(Runnable task, T result) {
2352
      return delegate.submit(task, result);
×
2353
    }
2354

2355
    @Override
2356
    public void execute(Runnable command) {
2357
      delegate.execute(command);
×
2358
    }
×
2359
  }
2360

2361
  /**
2362
   * A ResolutionState indicates the status of last name resolution.
2363
   */
2364
  enum ResolutionState {
1✔
2365
    NO_RESOLUTION,
1✔
2366
    SUCCESS,
1✔
2367
    ERROR
1✔
2368
  }
2369
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc