• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20136

06 Jan 2026 05:27AM UTC coverage: 88.693% (+0.01%) from 88.681%
#20136

push

github

web-flow
core: Implement oobChannel with resolvingOobChannel

The most important part of this change is to ensure that CallCredentials
are not propagated to the OOB channel. Because the authority of the OOB
channel doesn't match the parent channel, we must ensure that any bearer
tokens are not sent to the different server. However, this was not a
problem because resolvingOobChannel has the same constraint. (RLS has a
different constraint, but we were able to let RLS manage that itself.)

This commit does change the behavior of channelz, shutdown, and metrics
for the OOB channel. Previously the OOB channel was registered with
channelz, but it is only a TODO for resolving channel. Channel shutdown
no longer shuts down the OOB channel and it no longer waits for the OOB
channel to terminate before becoming terminated itself. That is also a
pre-existing TODO. Since ManagedChannelImplBuilder is now being used,
global configurators and census are enabled. The proper behavior here is
still being determined, but we would want it to be the same for
resolving OOB channel and OOB channel.

The OOB channel used to refresh the name resolution when the subchannel
went IDLE or TF. That is an older behavior from back when regular
subchannels would also cause the name resolver to refresh. Now-a-days
that goes though the LB tree. gRPC-LB already refreshes name resolution
when its RPC closes, so no longer doing it automatically should be fine.

balancerRpcExecutorPool no longer has its lifetime managed by the child.
It'd be easiest to not use it at all from OOB channel, which wouldn't
actually change the regular behavior, as channels already use the same
executor by default. However, the tests are making use of the executor
being injected, so some propagation needs to be preserved.

Lots of OOB channel tests were deleted, but these were either testing
OobChannel, which is now gone, or things like channelz, which are known
to no longer work like before.

35361 of 39869 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.21
/../core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
23
import static io.grpc.ConnectivityState.CONNECTING;
24
import static io.grpc.ConnectivityState.IDLE;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
28

29
import com.google.common.annotations.VisibleForTesting;
30
import com.google.common.base.MoreObjects;
31
import com.google.common.base.Stopwatch;
32
import com.google.common.base.Supplier;
33
import com.google.common.util.concurrent.ListenableFuture;
34
import com.google.common.util.concurrent.SettableFuture;
35
import com.google.errorprone.annotations.concurrent.GuardedBy;
36
import io.grpc.Attributes;
37
import io.grpc.CallCredentials;
38
import io.grpc.CallOptions;
39
import io.grpc.Channel;
40
import io.grpc.ChannelCredentials;
41
import io.grpc.ChannelLogger;
42
import io.grpc.ChannelLogger.ChannelLogLevel;
43
import io.grpc.ClientCall;
44
import io.grpc.ClientInterceptor;
45
import io.grpc.ClientInterceptors;
46
import io.grpc.ClientStreamTracer;
47
import io.grpc.ClientTransportFilter;
48
import io.grpc.CompressorRegistry;
49
import io.grpc.ConnectivityState;
50
import io.grpc.ConnectivityStateInfo;
51
import io.grpc.Context;
52
import io.grpc.Deadline;
53
import io.grpc.DecompressorRegistry;
54
import io.grpc.EquivalentAddressGroup;
55
import io.grpc.ForwardingChannelBuilder2;
56
import io.grpc.ForwardingClientCall;
57
import io.grpc.Grpc;
58
import io.grpc.InternalChannelz;
59
import io.grpc.InternalChannelz.ChannelStats;
60
import io.grpc.InternalChannelz.ChannelTrace;
61
import io.grpc.InternalConfigSelector;
62
import io.grpc.InternalInstrumented;
63
import io.grpc.InternalLogId;
64
import io.grpc.InternalWithLogId;
65
import io.grpc.LoadBalancer;
66
import io.grpc.LoadBalancer.CreateSubchannelArgs;
67
import io.grpc.LoadBalancer.PickResult;
68
import io.grpc.LoadBalancer.PickSubchannelArgs;
69
import io.grpc.LoadBalancer.ResolvedAddresses;
70
import io.grpc.LoadBalancer.SubchannelPicker;
71
import io.grpc.LoadBalancer.SubchannelStateListener;
72
import io.grpc.ManagedChannel;
73
import io.grpc.ManagedChannelBuilder;
74
import io.grpc.Metadata;
75
import io.grpc.MethodDescriptor;
76
import io.grpc.MetricInstrumentRegistry;
77
import io.grpc.MetricRecorder;
78
import io.grpc.NameResolver;
79
import io.grpc.NameResolver.ConfigOrError;
80
import io.grpc.NameResolver.ResolutionResult;
81
import io.grpc.NameResolverProvider;
82
import io.grpc.NameResolverRegistry;
83
import io.grpc.ProxyDetector;
84
import io.grpc.Status;
85
import io.grpc.StatusOr;
86
import io.grpc.SynchronizationContext;
87
import io.grpc.SynchronizationContext.ScheduledHandle;
88
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
89
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
90
import io.grpc.internal.ClientTransportFactory.SwapChannelCredentialsResult;
91
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
92
import io.grpc.internal.ManagedChannelImplBuilder.FixedPortProvider;
93
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
94
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
95
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
96
import io.grpc.internal.RetriableStream.Throttle;
97
import java.net.URI;
98
import java.net.URISyntaxException;
99
import java.util.ArrayList;
100
import java.util.Collection;
101
import java.util.Collections;
102
import java.util.HashSet;
103
import java.util.LinkedHashSet;
104
import java.util.List;
105
import java.util.Map;
106
import java.util.Set;
107
import java.util.concurrent.Callable;
108
import java.util.concurrent.CountDownLatch;
109
import java.util.concurrent.ExecutionException;
110
import java.util.concurrent.Executor;
111
import java.util.concurrent.Future;
112
import java.util.concurrent.ScheduledExecutorService;
113
import java.util.concurrent.ScheduledFuture;
114
import java.util.concurrent.TimeUnit;
115
import java.util.concurrent.TimeoutException;
116
import java.util.concurrent.atomic.AtomicBoolean;
117
import java.util.concurrent.atomic.AtomicReference;
118
import java.util.logging.Level;
119
import java.util.logging.Logger;
120
import javax.annotation.Nullable;
121
import javax.annotation.concurrent.ThreadSafe;
122

123
/** A communication channel for making outgoing RPCs. */
124
@ThreadSafe
125
final class ManagedChannelImpl extends ManagedChannel implements
126
    InternalInstrumented<ChannelStats> {
127
  @VisibleForTesting
128
  static final Logger logger = Logger.getLogger(ManagedChannelImpl.class.getName());
1✔
129

130
  static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1;
131

132
  static final long SUBCHANNEL_SHUTDOWN_DELAY_SECONDS = 5;
133

134
  @VisibleForTesting
135
  static final Status SHUTDOWN_NOW_STATUS =
1✔
136
      Status.UNAVAILABLE.withDescription("Channel shutdownNow invoked");
1✔
137

138
  @VisibleForTesting
139
  static final Status SHUTDOWN_STATUS =
1✔
140
      Status.UNAVAILABLE.withDescription("Channel shutdown invoked");
1✔
141

142
  @VisibleForTesting
143
  static final Status SUBCHANNEL_SHUTDOWN_STATUS =
1✔
144
      Status.UNAVAILABLE.withDescription("Subchannel shutdown invoked");
1✔
145

146
  private static final ManagedChannelServiceConfig EMPTY_SERVICE_CONFIG =
147
      ManagedChannelServiceConfig.empty();
1✔
148
  private static final InternalConfigSelector INITIAL_PENDING_SELECTOR =
1✔
149
      new InternalConfigSelector() {
1✔
150
        @Override
151
        public Result selectConfig(PickSubchannelArgs args) {
152
          throw new IllegalStateException("Resolution is pending");
×
153
        }
154
      };
155
  private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
1✔
156
      new LoadBalancer.PickDetailsConsumer() {};
1✔
157

158
  private final InternalLogId logId;
159
  private final String target;
160
  @Nullable
161
  private final String authorityOverride;
162
  private final NameResolverRegistry nameResolverRegistry;
163
  private final URI targetUri;
164
  private final NameResolverProvider nameResolverProvider;
165
  private final NameResolver.Args nameResolverArgs;
166
  private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
167
  private final ClientTransportFactory originalTransportFactory;
168
  @Nullable
169
  private final ChannelCredentials originalChannelCreds;
170
  private final ClientTransportFactory transportFactory;
171
  private final RestrictedScheduledExecutor scheduledExecutor;
172
  private final Executor executor;
173
  private final ObjectPool<? extends Executor> executorPool;
174
  private final ExecutorHolder balancerRpcExecutorHolder;
175
  private final ExecutorHolder offloadExecutorHolder;
176
  private final TimeProvider timeProvider;
177
  private final int maxTraceEvents;
178

179
  @VisibleForTesting
1✔
180
  final SynchronizationContext syncContext = new SynchronizationContext(
181
      new Thread.UncaughtExceptionHandler() {
1✔
182
        @Override
183
        public void uncaughtException(Thread t, Throwable e) {
184
          logger.log(
1✔
185
              Level.SEVERE,
186
              "[" + getLogId() + "] Uncaught exception in the SynchronizationContext. Panic!",
1✔
187
              e);
188
          try {
189
            panic(e);
1✔
190
          } catch (Throwable anotherT) {
×
191
            logger.log(
×
192
                Level.SEVERE, "[" + getLogId() + "] Uncaught exception while panicking", anotherT);
×
193
          }
1✔
194
        }
1✔
195
      });
196

197
  private boolean fullStreamDecompression;
198

199
  private final DecompressorRegistry decompressorRegistry;
200
  private final CompressorRegistry compressorRegistry;
201

202
  private final Supplier<Stopwatch> stopwatchSupplier;
203
  /** The timeout before entering idle mode. */
204
  private final long idleTimeoutMillis;
205

206
  private final ConnectivityStateManager channelStateManager = new ConnectivityStateManager();
1✔
207
  private final BackoffPolicy.Provider backoffPolicyProvider;
208

209
  /**
210
   * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
211
   * any interceptors and the {@link io.grpc.BinaryLog} is {@code null} then this will just be a
212
   * {@link RealChannel}.
213
   */
214
  private final Channel interceptorChannel;
215

216
  private final List<ClientTransportFilter> transportFilters;
217
  @Nullable private final String userAgent;
218

219
  // Only null after channel is terminated. Must be assigned from the syncContext.
220
  private NameResolver nameResolver;
221

222
  // Must be accessed from the syncContext.
223
  private boolean nameResolverStarted;
224

225
  // null when channel is in idle mode.  Must be assigned from syncContext.
226
  @Nullable
227
  private LbHelperImpl lbHelper;
228

229
  // Must be accessed from the syncContext
230
  private boolean panicMode;
231

232
  // Must be mutated from syncContext
233
  // If any monitoring hook to be added later needs to get a snapshot of this Set, we could
234
  // switch to a ConcurrentHashMap.
235
  private final Set<InternalSubchannel> subchannels = new HashSet<>(16, .75f);
1✔
236

237
  // Must be accessed from syncContext
238
  @Nullable
239
  private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
240
  private final Object pendingCallsInUseObject = new Object();
1✔
241

242
  // reprocess() must be run from syncContext
243
  private final DelayedClientTransport delayedTransport;
244
  private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
1✔
245
      = new UncommittedRetriableStreamsRegistry();
246

247
  // Shutdown states.
248
  //
249
  // Channel's shutdown process:
250
  // 1. shutdown(): stop accepting new calls from applications
251
  //   1a shutdown <- true
252
  //   1b delayedTransport.shutdown()
253
  // 2. delayedTransport terminated: stop stream-creation functionality
254
  //   2a terminating <- true
255
  //   2b loadBalancer.shutdown()
256
  //     * LoadBalancer will shutdown subchannels and OOB channels
257
  //   2c loadBalancer <- null
258
  //   2d nameResolver.shutdown()
259
  //   2e nameResolver <- null
260
  // 3. All subchannels and OOB channels terminated: Channel considered terminated
261

262
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
263
  // Must only be mutated and read from syncContext
264
  private boolean shutdownNowed;
265
  // Must only be mutated from syncContext
266
  private boolean terminating;
267
  // Must be mutated from syncContext
268
  private volatile boolean terminated;
269
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
270

271
  private final CallTracer.Factory callTracerFactory;
272
  private final CallTracer channelCallTracer;
273
  private final ChannelTracer channelTracer;
274
  private final ChannelLogger channelLogger;
275
  private final InternalChannelz channelz;
276
  private final RealChannel realChannel;
277
  // Must be mutated and read from syncContext
278
  // a flag for doing channel tracing when flipped
279
  private ResolutionState lastResolutionState = ResolutionState.NO_RESOLUTION;
1✔
280
  // Must be mutated and read from constructor or syncContext
281
  // used for channel tracing when value changed
282
  private ManagedChannelServiceConfig lastServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
283

284
  @Nullable
285
  private final ManagedChannelServiceConfig defaultServiceConfig;
286
  // Must be mutated and read from constructor or syncContext
287
  private boolean serviceConfigUpdated = false;
1✔
288
  private final boolean lookUpServiceConfig;
289

290
  // One instance per channel.
291
  private final ChannelBufferMeter channelBufferUsed = new ChannelBufferMeter();
1✔
292

293
  private final long perRpcBufferLimit;
294
  private final long channelBufferLimit;
295

296
  // Temporary false flag that can skip the retry code path.
297
  private final boolean retryEnabled;
298

299
  private final Deadline.Ticker ticker = Deadline.getSystemTicker();
1✔
300

301
  // Called from syncContext
302
  private final ManagedClientTransport.Listener delayedTransportListener =
1✔
303
      new DelayedTransportListener();
304

305
  // Must be called from syncContext
306
  private void maybeShutdownNowSubchannels() {
307
    if (shutdownNowed) {
1✔
308
      for (InternalSubchannel subchannel : subchannels) {
1✔
309
        subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
1✔
310
      }
1✔
311
    }
312
  }
1✔
313

314
  // Must be accessed from syncContext
315
  @VisibleForTesting
1✔
316
  final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
317

318
  @Override
319
  public ListenableFuture<ChannelStats> getStats() {
320
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
321
    final class StatsFetcher implements Runnable {
1✔
322
      @Override
323
      public void run() {
324
        ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
1✔
325
        channelCallTracer.updateBuilder(builder);
1✔
326
        channelTracer.updateBuilder(builder);
1✔
327
        builder.setTarget(target).setState(channelStateManager.getState());
1✔
328
        List<InternalWithLogId> children = new ArrayList<>();
1✔
329
        children.addAll(subchannels);
1✔
330
        builder.setSubchannels(children);
1✔
331
        ret.set(builder.build());
1✔
332
      }
1✔
333
    }
334

335
    // subchannels and oobchannels can only be accessed from syncContext
336
    syncContext.execute(new StatsFetcher());
1✔
337
    return ret;
1✔
338
  }
339

340
  @Override
341
  public InternalLogId getLogId() {
342
    return logId;
1✔
343
  }
344

345
  // Run from syncContext
346
  private class IdleModeTimer implements Runnable {
1✔
347

348
    @Override
349
    public void run() {
350
      // Workaround timer scheduled while in idle mode. This can happen from handleNotInUse() after
351
      // an explicit enterIdleMode() by the user. Protecting here as other locations are a bit too
352
      // subtle to change rapidly to resolve the channel panic. See #8714
353
      if (lbHelper == null) {
1✔
354
        return;
×
355
      }
356
      enterIdleMode();
1✔
357
    }
1✔
358
  }
359

360
  // Must be called from syncContext
361
  private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
362
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
363
    if (channelIsActive) {
1✔
364
      checkState(nameResolverStarted, "nameResolver is not started");
1✔
365
      checkState(lbHelper != null, "lbHelper is null");
1✔
366
    }
367
    if (nameResolver != null) {
1✔
368
      nameResolver.shutdown();
1✔
369
      nameResolverStarted = false;
1✔
370
      if (channelIsActive) {
1✔
371
        nameResolver = getNameResolver(
1✔
372
            targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
373
      } else {
374
        nameResolver = null;
1✔
375
      }
376
    }
377
    if (lbHelper != null) {
1✔
378
      lbHelper.lb.shutdown();
1✔
379
      lbHelper = null;
1✔
380
    }
381
  }
1✔
382

383
  /**
384
   * Make the channel exit idle mode, if it's in it.
385
   *
386
   * <p>Must be called from syncContext
387
   */
388
  @VisibleForTesting
389
  void exitIdleMode() {
390
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
391
    if (shutdown.get() || panicMode) {
1✔
392
      return;
1✔
393
    }
394
    if (inUseStateAggregator.isInUse()) {
1✔
395
      // Cancel the timer now, so that a racing due timer will not put Channel on idleness
396
      // when the caller of exitIdleMode() is about to use the returned loadBalancer.
397
      cancelIdleTimer(false);
1✔
398
    } else {
399
      // exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
400
      // isInUse() == false, in which case we still need to schedule the timer.
401
      rescheduleIdleTimer();
1✔
402
    }
403
    if (lbHelper != null) {
1✔
404
      return;
1✔
405
    }
406
    channelLogger.log(ChannelLogLevel.INFO, "Exiting idle mode");
1✔
407
    LbHelperImpl lbHelper = new LbHelperImpl();
1✔
408
    lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
1✔
409
    // Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
410
    // may throw. We don't want to confuse our state, even if we enter panic mode.
411
    this.lbHelper = lbHelper;
1✔
412

413
    channelStateManager.gotoState(CONNECTING);
1✔
414
    NameResolverListener listener = new NameResolverListener(lbHelper, nameResolver);
1✔
415
    nameResolver.start(listener);
1✔
416
    nameResolverStarted = true;
1✔
417
  }
1✔
418

419
  // Must be run from syncContext
420
  private void enterIdleMode() {
421
    // nameResolver and loadBalancer are guaranteed to be non-null.  If any of them were null,
422
    // either the idleModeTimer ran twice without exiting the idle mode, or the task in shutdown()
423
    // did not cancel idleModeTimer, or enterIdle() ran while shutdown or in idle, all of
424
    // which are bugs.
425
    shutdownNameResolverAndLoadBalancer(true);
1✔
426
    delayedTransport.reprocess(null);
1✔
427
    channelLogger.log(ChannelLogLevel.INFO, "Entering IDLE state");
1✔
428
    channelStateManager.gotoState(IDLE);
1✔
429
    // If the inUseStateAggregator still considers pending calls to be queued up or the delayed
430
    // transport to be holding some we need to exit idle mode to give these calls a chance to
431
    // be processed.
432
    if (inUseStateAggregator.anyObjectInUse(pendingCallsInUseObject, delayedTransport)) {
1✔
433
      exitIdleMode();
1✔
434
    }
435
  }
1✔
436

437
  // Must be run from syncContext
438
  private void cancelIdleTimer(boolean permanent) {
439
    idleTimer.cancel(permanent);
1✔
440
  }
1✔
441

442
  // Always run from syncContext
443
  private void rescheduleIdleTimer() {
444
    if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
445
      return;
1✔
446
    }
447
    idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
1✔
448
  }
1✔
449

450
  /**
451
   * Force name resolution refresh to happen immediately. Must be run
452
   * from syncContext.
453
   */
454
  private void refreshNameResolution() {
455
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
456
    if (nameResolverStarted) {
1✔
457
      nameResolver.refresh();
1✔
458
    }
459
  }
1✔
460

461
  private final class ChannelStreamProvider implements ClientStreamProvider {
1✔
462
    volatile Throttle throttle;
463

464
    @Override
465
    public ClientStream newStream(
466
        final MethodDescriptor<?, ?> method,
467
        final CallOptions callOptions,
468
        final Metadata headers,
469
        final Context context) {
470
      // There is no need to reschedule the idle timer here. If the channel isn't shut down, either
471
      // the delayed transport or a real transport will go in-use and cancel the idle timer.
472
      if (!retryEnabled) {
1✔
473
        ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
474
            callOptions, headers, 0, /* isTransparentRetry= */ false,
475
            /* isHedging= */false);
476
        Context origContext = context.attach();
1✔
477
        try {
478
          return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
479
        } finally {
480
          context.detach(origContext);
1✔
481
        }
482
      } else {
483
        MethodInfo methodInfo = callOptions.getOption(MethodInfo.KEY);
1✔
484
        final RetryPolicy retryPolicy = methodInfo == null ? null : methodInfo.retryPolicy;
1✔
485
        final HedgingPolicy hedgingPolicy = methodInfo == null ? null : methodInfo.hedgingPolicy;
1✔
486
        final class RetryStream<ReqT> extends RetriableStream<ReqT> {
487
          @SuppressWarnings("unchecked")
488
          RetryStream() {
1✔
489
            super(
1✔
490
                (MethodDescriptor<ReqT, ?>) method,
491
                headers,
492
                channelBufferUsed,
1✔
493
                perRpcBufferLimit,
1✔
494
                channelBufferLimit,
1✔
495
                getCallExecutor(callOptions),
1✔
496
                transportFactory.getScheduledExecutorService(),
1✔
497
                retryPolicy,
498
                hedgingPolicy,
499
                throttle);
500
          }
1✔
501

502
          @Override
503
          Status prestart() {
504
            return uncommittedRetriableStreamsRegistry.add(this);
1✔
505
          }
506

507
          @Override
508
          void postCommit() {
509
            uncommittedRetriableStreamsRegistry.remove(this);
1✔
510
          }
1✔
511

512
          @Override
513
          ClientStream newSubstream(
514
              Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
515
              boolean isTransparentRetry, boolean isHedgedStream) {
516
            CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
1✔
517
            ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
518
                newOptions, newHeaders, previousAttempts, isTransparentRetry, isHedgedStream);
519
            Context origContext = context.attach();
1✔
520
            try {
521
              return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
1✔
522
            } finally {
523
              context.detach(origContext);
1✔
524
            }
525
          }
526
        }
527

528
        return new RetryStream<>();
1✔
529
      }
530
    }
531
  }
532

533
  private final ChannelStreamProvider transportProvider = new ChannelStreamProvider();
1✔
534

535
  private final Rescheduler idleTimer;
536
  private final MetricRecorder metricRecorder;
537

538
  ManagedChannelImpl(
539
      ManagedChannelImplBuilder builder,
540
      ClientTransportFactory clientTransportFactory,
541
      URI targetUri,
542
      NameResolverProvider nameResolverProvider,
543
      BackoffPolicy.Provider backoffPolicyProvider,
544
      ObjectPool<? extends Executor> balancerRpcExecutorPool,
545
      Supplier<Stopwatch> stopwatchSupplier,
546
      List<ClientInterceptor> interceptors,
547
      final TimeProvider timeProvider) {
1✔
548
    this.target = checkNotNull(builder.target, "target");
1✔
549
    this.logId = InternalLogId.allocate("Channel", target);
1✔
550
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
551
    this.executorPool = checkNotNull(builder.executorPool, "executorPool");
1✔
552
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
553
    this.originalChannelCreds = builder.channelCredentials;
1✔
554
    this.originalTransportFactory = clientTransportFactory;
1✔
555
    this.offloadExecutorHolder =
1✔
556
        new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
1✔
557
    this.transportFactory = new CallCredentialsApplyingTransportFactory(
1✔
558
        clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
559
    this.scheduledExecutor =
1✔
560
        new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
1✔
561
    maxTraceEvents = builder.maxTraceEvents;
1✔
562
    channelTracer = new ChannelTracer(
1✔
563
        logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
564
        "Channel for '" + target + "'");
565
    channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
1✔
566
    ProxyDetector proxyDetector =
567
        builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.DEFAULT_PROXY_DETECTOR;
1✔
568
    this.retryEnabled = builder.retryEnabled;
1✔
569
    this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory(builder.defaultLbPolicy);
1✔
570
    this.nameResolverRegistry = builder.nameResolverRegistry;
1✔
571
    this.targetUri = checkNotNull(targetUri, "targetUri");
1✔
572
    this.nameResolverProvider = checkNotNull(nameResolverProvider, "nameResolverProvider");
1✔
573
    ScParser serviceConfigParser =
1✔
574
        new ScParser(
575
            retryEnabled,
576
            builder.maxRetryAttempts,
577
            builder.maxHedgedAttempts,
578
            loadBalancerFactory);
579
    this.authorityOverride = builder.authorityOverride;
1✔
580
    this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
1✔
581
        MetricInstrumentRegistry.getDefaultRegistry());
1✔
582
    NameResolver.Args.Builder nameResolverArgsBuilder = NameResolver.Args.newBuilder()
1✔
583
            .setDefaultPort(builder.getDefaultPort())
1✔
584
            .setProxyDetector(proxyDetector)
1✔
585
            .setSynchronizationContext(syncContext)
1✔
586
            .setScheduledExecutorService(scheduledExecutor)
1✔
587
            .setServiceConfigParser(serviceConfigParser)
1✔
588
            .setChannelLogger(channelLogger)
1✔
589
            .setOffloadExecutor(this.offloadExecutorHolder)
1✔
590
            .setOverrideAuthority(this.authorityOverride)
1✔
591
            .setMetricRecorder(this.metricRecorder)
1✔
592
            .setNameResolverRegistry(builder.nameResolverRegistry);
1✔
593
    builder.copyAllNameResolverCustomArgsTo(nameResolverArgsBuilder);
1✔
594
    this.nameResolverArgs = nameResolverArgsBuilder.build();
1✔
595
    this.nameResolver = getNameResolver(
1✔
596
        targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
597
    this.balancerRpcExecutorHolder = new ExecutorHolder(
1✔
598
        checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool"));
1✔
599
    this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
1✔
600
    this.delayedTransport.start(delayedTransportListener);
1✔
601
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
602

603
    if (builder.defaultServiceConfig != null) {
1✔
604
      ConfigOrError parsedDefaultServiceConfig =
1✔
605
          serviceConfigParser.parseServiceConfig(builder.defaultServiceConfig);
1✔
606
      checkState(
1✔
607
          parsedDefaultServiceConfig.getError() == null,
1✔
608
          "Default config is invalid: %s",
609
          parsedDefaultServiceConfig.getError());
1✔
610
      this.defaultServiceConfig =
1✔
611
          (ManagedChannelServiceConfig) parsedDefaultServiceConfig.getConfig();
1✔
612
      this.transportProvider.throttle = this.defaultServiceConfig.getRetryThrottling();
1✔
613
    } else {
1✔
614
      this.defaultServiceConfig = null;
1✔
615
    }
616
    this.lookUpServiceConfig = builder.lookUpServiceConfig;
1✔
617
    realChannel = new RealChannel(nameResolver.getServiceAuthority());
1✔
618
    Channel channel = realChannel;
1✔
619
    if (builder.binlog != null) {
1✔
620
      channel = builder.binlog.wrapChannel(channel);
1✔
621
    }
622
    this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
1✔
623
    this.transportFilters = new ArrayList<>(builder.transportFilters);
1✔
624
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
625
    if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
1✔
626
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
627
    } else {
628
      checkArgument(
1✔
629
          builder.idleTimeoutMillis
630
              >= ManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
631
          "invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
632
      this.idleTimeoutMillis = builder.idleTimeoutMillis;
1✔
633
    }
634

635
    idleTimer = new Rescheduler(
1✔
636
        new IdleModeTimer(),
637
        syncContext,
638
        transportFactory.getScheduledExecutorService(),
1✔
639
        stopwatchSupplier.get());
1✔
640
    this.fullStreamDecompression = builder.fullStreamDecompression;
1✔
641
    this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
1✔
642
    this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
1✔
643
    this.userAgent = builder.userAgent;
1✔
644

645
    this.channelBufferLimit = builder.retryBufferSize;
1✔
646
    this.perRpcBufferLimit = builder.perRpcBufferLimit;
1✔
647
    final class ChannelCallTracerFactory implements CallTracer.Factory {
1✔
648
      @Override
649
      public CallTracer create() {
650
        return new CallTracer(timeProvider);
1✔
651
      }
652
    }
653

654
    this.callTracerFactory = new ChannelCallTracerFactory();
1✔
655
    channelCallTracer = callTracerFactory.create();
1✔
656
    this.channelz = checkNotNull(builder.channelz);
1✔
657
    channelz.addRootChannel(this);
1✔
658

659
    if (!lookUpServiceConfig) {
1✔
660
      if (defaultServiceConfig != null) {
1✔
661
        channelLogger.log(
1✔
662
            ChannelLogLevel.INFO, "Service config look-up disabled, using default service config");
663
      }
664
      serviceConfigUpdated = true;
1✔
665
    }
666
  }
1✔
667

668
  @VisibleForTesting
669
  static NameResolver getNameResolver(
670
      URI targetUri, @Nullable final String overrideAuthority,
671
      NameResolverProvider provider, NameResolver.Args nameResolverArgs) {
672
    NameResolver resolver = provider.newNameResolver(targetUri, nameResolverArgs);
1✔
673
    if (resolver == null) {
1✔
674
      throw new IllegalArgumentException("cannot create a NameResolver for " + targetUri);
1✔
675
    }
676

677
    // We wrap the name resolver in a RetryingNameResolver to give it the ability to retry failures.
678
    // TODO: After a transition period, all NameResolver implementations that need retry should use
679
    //       RetryingNameResolver directly and this step can be removed.
680
    NameResolver usedNameResolver = RetryingNameResolver.wrap(resolver, nameResolverArgs);
1✔
681

682
    if (overrideAuthority == null) {
1✔
683
      return usedNameResolver;
1✔
684
    }
685

686
    return new ForwardingNameResolver(usedNameResolver) {
1✔
687
      @Override
688
      public String getServiceAuthority() {
689
        return overrideAuthority;
1✔
690
      }
691
    };
692
  }
693

694
  @VisibleForTesting
695
  InternalConfigSelector getConfigSelector() {
696
    return realChannel.configSelector.get();
1✔
697
  }
698
  
699
  @VisibleForTesting
700
  boolean hasThrottle() {
701
    return this.transportProvider.throttle != null;
1✔
702
  }
703

704
  /**
705
   * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
706
   * cancelled.
707
   */
708
  @Override
709
  public ManagedChannelImpl shutdown() {
710
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdown() called");
1✔
711
    if (!shutdown.compareAndSet(false, true)) {
1✔
712
      return this;
1✔
713
    }
714
    final class Shutdown implements Runnable {
1✔
715
      @Override
716
      public void run() {
717
        channelLogger.log(ChannelLogLevel.INFO, "Entering SHUTDOWN state");
1✔
718
        channelStateManager.gotoState(SHUTDOWN);
1✔
719
      }
1✔
720
    }
721

722
    syncContext.execute(new Shutdown());
1✔
723
    realChannel.shutdown();
1✔
724
    final class CancelIdleTimer implements Runnable {
1✔
725
      @Override
726
      public void run() {
727
        cancelIdleTimer(/* permanent= */ true);
1✔
728
      }
1✔
729
    }
730

731
    syncContext.execute(new CancelIdleTimer());
1✔
732
    return this;
1✔
733
  }
734

735
  /**
736
   * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
737
   * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
738
   * return {@code false} immediately after this method returns.
739
   */
740
  @Override
741
  public ManagedChannelImpl shutdownNow() {
742
    channelLogger.log(ChannelLogLevel.DEBUG, "shutdownNow() called");
1✔
743
    shutdown();
1✔
744
    realChannel.shutdownNow();
1✔
745
    final class ShutdownNow implements Runnable {
1✔
746
      @Override
747
      public void run() {
748
        if (shutdownNowed) {
1✔
749
          return;
1✔
750
        }
751
        shutdownNowed = true;
1✔
752
        maybeShutdownNowSubchannels();
1✔
753
      }
1✔
754
    }
755

756
    syncContext.execute(new ShutdownNow());
1✔
757
    return this;
1✔
758
  }
759

760
  // Called from syncContext
761
  @VisibleForTesting
762
  void panic(final Throwable t) {
763
    if (panicMode) {
1✔
764
      // Preserve the first panic information
765
      return;
×
766
    }
767
    panicMode = true;
1✔
768
    try {
769
      cancelIdleTimer(/* permanent= */ true);
1✔
770
      shutdownNameResolverAndLoadBalancer(false);
1✔
771
    } finally {
772
      updateSubchannelPicker(new LoadBalancer.FixedResultPicker(PickResult.withDrop(
1✔
773
          Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t))));
1✔
774
      realChannel.updateConfigSelector(null);
1✔
775
      channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
1✔
776
      channelStateManager.gotoState(TRANSIENT_FAILURE);
1✔
777
    }
778
  }
1✔
779

780
  @VisibleForTesting
781
  boolean isInPanicMode() {
782
    return panicMode;
1✔
783
  }
784

785
  // Called from syncContext
786
  private void updateSubchannelPicker(SubchannelPicker newPicker) {
787
    delayedTransport.reprocess(newPicker);
1✔
788
  }
1✔
789

790
  @Override
791
  public boolean isShutdown() {
792
    return shutdown.get();
1✔
793
  }
794

795
  @Override
796
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
797
    return terminatedLatch.await(timeout, unit);
1✔
798
  }
799

800
  @Override
801
  public boolean isTerminated() {
802
    return terminated;
1✔
803
  }
804

805
  /*
806
   * Creates a new outgoing call on the channel.
807
   */
808
  @Override
809
  public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
810
      CallOptions callOptions) {
811
    return interceptorChannel.newCall(method, callOptions);
1✔
812
  }
813

814
  @Override
815
  public String authority() {
816
    return interceptorChannel.authority();
1✔
817
  }
818

819
  private Executor getCallExecutor(CallOptions callOptions) {
820
    Executor executor = callOptions.getExecutor();
1✔
821
    if (executor == null) {
1✔
822
      executor = this.executor;
1✔
823
    }
824
    return executor;
1✔
825
  }
826

827
  private class RealChannel extends Channel {
828
    // Reference to null if no config selector is available from resolution result
829
    // Reference must be set() from syncContext
830
    private final AtomicReference<InternalConfigSelector> configSelector =
1✔
831
        new AtomicReference<>(INITIAL_PENDING_SELECTOR);
1✔
832
    // Set when the NameResolver is initially created. When we create a new NameResolver for the
833
    // same target, the new instance must have the same value.
834
    private final String authority;
835

836
    private final Channel clientCallImplChannel = new Channel() {
1✔
837
      @Override
838
      public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
839
          MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
840
        return new ClientCallImpl<>(
1✔
841
            method,
842
            getCallExecutor(callOptions),
1✔
843
            callOptions,
844
            transportProvider,
1✔
845
            terminated ? null : transportFactory.getScheduledExecutorService(),
1✔
846
            channelCallTracer,
1✔
847
            null)
848
            .setFullStreamDecompression(fullStreamDecompression)
1✔
849
            .setDecompressorRegistry(decompressorRegistry)
1✔
850
            .setCompressorRegistry(compressorRegistry);
1✔
851
      }
852

853
      @Override
854
      public String authority() {
855
        return authority;
×
856
      }
857
    };
858

859
    private RealChannel(String authority) {
1✔
860
      this.authority =  checkNotNull(authority, "authority");
1✔
861
    }
1✔
862

863
    @Override
864
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
865
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
866
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
867
        return newClientCall(method, callOptions);
1✔
868
      }
869
      syncContext.execute(new Runnable() {
1✔
870
        @Override
871
        public void run() {
872
          exitIdleMode();
1✔
873
        }
1✔
874
      });
875
      if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
1✔
876
        // This is an optimization for the case (typically with InProcessTransport) when name
877
        // resolution result is immediately available at this point. Otherwise, some users'
878
        // tests might observe slight behavior difference from earlier grpc versions.
879
        return newClientCall(method, callOptions);
1✔
880
      }
881
      if (shutdown.get()) {
1✔
882
        // Return a failing ClientCall.
883
        return new ClientCall<ReqT, RespT>() {
×
884
          @Override
885
          public void start(Listener<RespT> responseListener, Metadata headers) {
886
            responseListener.onClose(SHUTDOWN_STATUS, new Metadata());
×
887
          }
×
888

889
          @Override public void request(int numMessages) {}
×
890

891
          @Override public void cancel(@Nullable String message, @Nullable Throwable cause) {}
×
892

893
          @Override public void halfClose() {}
×
894

895
          @Override public void sendMessage(ReqT message) {}
×
896
        };
897
      }
898
      Context context = Context.current();
1✔
899
      final PendingCall<ReqT, RespT> pendingCall = new PendingCall<>(context, method, callOptions);
1✔
900
      syncContext.execute(new Runnable() {
1✔
901
        @Override
902
        public void run() {
903
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
904
            if (pendingCalls == null) {
1✔
905
              pendingCalls = new LinkedHashSet<>();
1✔
906
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
1✔
907
            }
908
            pendingCalls.add(pendingCall);
1✔
909
          } else {
910
            pendingCall.reprocess();
1✔
911
          }
912
        }
1✔
913
      });
914
      return pendingCall;
1✔
915
    }
916

917
    // Must run in SynchronizationContext.
918
    void updateConfigSelector(@Nullable InternalConfigSelector config) {
919
      InternalConfigSelector prevConfig = configSelector.get();
1✔
920
      configSelector.set(config);
1✔
921
      if (prevConfig == INITIAL_PENDING_SELECTOR && pendingCalls != null) {
1✔
922
        for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
923
          pendingCall.reprocess();
1✔
924
        }
1✔
925
      }
926
    }
1✔
927

928
    // Must run in SynchronizationContext.
929
    void onConfigError() {
930
      if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
931
        // Apply Default Service Config if initial name resolution fails.
932
        if (defaultServiceConfig != null) {
1✔
933
          updateConfigSelector(defaultServiceConfig.getDefaultConfigSelector());
1✔
934
          lastServiceConfig = defaultServiceConfig;
1✔
935
          channelLogger.log(ChannelLogLevel.ERROR,
1✔
936
              "Initial Name Resolution error, using default service config");
937
        } else {
938
          updateConfigSelector(null);
1✔
939
        }
940
      }
941
    }
1✔
942

943
    void shutdown() {
944
      final class RealChannelShutdown implements Runnable {
1✔
945
        @Override
946
        public void run() {
947
          if (pendingCalls == null) {
1✔
948
            if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
949
              configSelector.set(null);
1✔
950
            }
951
            uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
952
          }
953
        }
1✔
954
      }
955

956
      syncContext.execute(new RealChannelShutdown());
1✔
957
    }
1✔
958

959
    void shutdownNow() {
960
      final class RealChannelShutdownNow implements Runnable {
1✔
961
        @Override
962
        public void run() {
963
          if (configSelector.get() == INITIAL_PENDING_SELECTOR) {
1✔
964
            configSelector.set(null);
1✔
965
          }
966
          if (pendingCalls != null) {
1✔
967
            for (RealChannel.PendingCall<?, ?> pendingCall : pendingCalls) {
1✔
968
              pendingCall.cancel("Channel is forcefully shutdown", null);
1✔
969
            }
1✔
970
          }
971
          uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
1✔
972
        }
1✔
973
      }
974

975
      syncContext.execute(new RealChannelShutdownNow());
1✔
976
    }
1✔
977

978
    @Override
979
    public String authority() {
980
      return authority;
1✔
981
    }
982

983
    private final class PendingCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
984
      final Context context;
985
      final MethodDescriptor<ReqT, RespT> method;
986
      final CallOptions callOptions;
987
      private final long callCreationTime;
988

989
      PendingCall(
990
          Context context, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1✔
991
        super(getCallExecutor(callOptions), scheduledExecutor, callOptions.getDeadline());
1✔
992
        this.context = context;
1✔
993
        this.method = method;
1✔
994
        this.callOptions = callOptions;
1✔
995
        this.callCreationTime = ticker.nanoTime();
1✔
996
      }
1✔
997

998
      /** Called when it's ready to create a real call and reprocess the pending call. */
999
      void reprocess() {
1000
        ClientCall<ReqT, RespT> realCall;
1001
        Context previous = context.attach();
1✔
1002
        try {
1003
          CallOptions delayResolutionOption = callOptions.withOption(NAME_RESOLUTION_DELAYED,
1✔
1004
              ticker.nanoTime() - callCreationTime);
1✔
1005
          realCall = newClientCall(method, delayResolutionOption);
1✔
1006
        } finally {
1007
          context.detach(previous);
1✔
1008
        }
1009
        Runnable toRun = setCall(realCall);
1✔
1010
        if (toRun == null) {
1✔
1011
          syncContext.execute(new PendingCallRemoval());
1✔
1012
        } else {
1013
          getCallExecutor(callOptions).execute(new Runnable() {
1✔
1014
            @Override
1015
            public void run() {
1016
              toRun.run();
1✔
1017
              syncContext.execute(new PendingCallRemoval());
1✔
1018
            }
1✔
1019
          });
1020
        }
1021
      }
1✔
1022

1023
      @Override
1024
      protected void callCancelled() {
1025
        super.callCancelled();
1✔
1026
        syncContext.execute(new PendingCallRemoval());
1✔
1027
      }
1✔
1028

1029
      final class PendingCallRemoval implements Runnable {
1✔
1030
        @Override
1031
        public void run() {
1032
          if (pendingCalls != null) {
1✔
1033
            pendingCalls.remove(PendingCall.this);
1✔
1034
            if (pendingCalls.isEmpty()) {
1✔
1035
              inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, false);
1✔
1036
              pendingCalls = null;
1✔
1037
              if (shutdown.get()) {
1✔
1038
                uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
1✔
1039
              }
1040
            }
1041
          }
1042
        }
1✔
1043
      }
1044
    }
1045

1046
    private <ReqT, RespT> ClientCall<ReqT, RespT> newClientCall(
1047
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
1048
      InternalConfigSelector selector = configSelector.get();
1✔
1049
      if (selector == null) {
1✔
1050
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1051
      }
1052
      if (selector instanceof ServiceConfigConvertedSelector) {
1✔
1053
        MethodInfo methodInfo =
1✔
1054
            ((ServiceConfigConvertedSelector) selector).config.getMethodConfig(method);
1✔
1055
        if (methodInfo != null) {
1✔
1056
          callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1057
        }
1058
        return clientCallImplChannel.newCall(method, callOptions);
1✔
1059
      }
1060
      return new ConfigSelectingClientCall<>(
1✔
1061
          selector, clientCallImplChannel, executor, method, callOptions);
1✔
1062
    }
1063
  }
1064

1065
  /**
1066
   * A client call for a given channel that applies a given config selector when it starts.
1067
   */
1068
  static final class ConfigSelectingClientCall<ReqT, RespT>
1069
      extends ForwardingClientCall<ReqT, RespT> {
1070

1071
    private final InternalConfigSelector configSelector;
1072
    private final Channel channel;
1073
    private final Executor callExecutor;
1074
    private final MethodDescriptor<ReqT, RespT> method;
1075
    private final Context context;
1076
    private CallOptions callOptions;
1077

1078
    private ClientCall<ReqT, RespT> delegate;
1079

1080
    ConfigSelectingClientCall(
1081
        InternalConfigSelector configSelector, Channel channel, Executor channelExecutor,
1082
        MethodDescriptor<ReqT, RespT> method,
1083
        CallOptions callOptions) {
1✔
1084
      this.configSelector = configSelector;
1✔
1085
      this.channel = channel;
1✔
1086
      this.method = method;
1✔
1087
      this.callExecutor =
1✔
1088
          callOptions.getExecutor() == null ? channelExecutor : callOptions.getExecutor();
1✔
1089
      this.callOptions = callOptions.withExecutor(callExecutor);
1✔
1090
      this.context = Context.current();
1✔
1091
    }
1✔
1092

1093
    @Override
1094
    protected ClientCall<ReqT, RespT> delegate() {
1095
      return delegate;
1✔
1096
    }
1097

1098
    @SuppressWarnings("unchecked")
1099
    @Override
1100
    public void start(Listener<RespT> observer, Metadata headers) {
1101
      PickSubchannelArgs args =
1✔
1102
          new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
1✔
1103
      InternalConfigSelector.Result result = configSelector.selectConfig(args);
1✔
1104
      Status status = result.getStatus();
1✔
1105
      if (!status.isOk()) {
1✔
1106
        executeCloseObserverInContext(observer,
1✔
1107
            GrpcUtil.replaceInappropriateControlPlaneStatus(status));
1✔
1108
        delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
1✔
1109
        return;
1✔
1110
      }
1111
      ClientInterceptor interceptor = result.getInterceptor();
1✔
1112
      ManagedChannelServiceConfig config = (ManagedChannelServiceConfig) result.getConfig();
1✔
1113
      MethodInfo methodInfo = config.getMethodConfig(method);
1✔
1114
      if (methodInfo != null) {
1✔
1115
        callOptions = callOptions.withOption(MethodInfo.KEY, methodInfo);
1✔
1116
      }
1117
      if (interceptor != null) {
1✔
1118
        delegate = interceptor.interceptCall(method, callOptions, channel);
1✔
1119
      } else {
1120
        delegate = channel.newCall(method, callOptions);
×
1121
      }
1122
      delegate.start(observer, headers);
1✔
1123
    }
1✔
1124

1125
    private void executeCloseObserverInContext(
1126
        final Listener<RespT> observer, final Status status) {
1127
      class CloseInContext extends ContextRunnable {
1128
        CloseInContext() {
1✔
1129
          super(context);
1✔
1130
        }
1✔
1131

1132
        @Override
1133
        public void runInContext() {
1134
          observer.onClose(status, new Metadata());
1✔
1135
        }
1✔
1136
      }
1137

1138
      callExecutor.execute(new CloseInContext());
1✔
1139
    }
1✔
1140

1141
    @Override
1142
    public void cancel(@Nullable String message, @Nullable Throwable cause) {
1143
      if (delegate != null) {
×
1144
        delegate.cancel(message, cause);
×
1145
      }
1146
    }
×
1147
  }
1148

1149
  private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
1✔
1150
    @Override
1151
    public void start(Listener<Object> responseListener, Metadata headers) {}
×
1152

1153
    @Override
1154
    public void request(int numMessages) {}
1✔
1155

1156
    @Override
1157
    public void cancel(String message, Throwable cause) {}
×
1158

1159
    @Override
1160
    public void halfClose() {}
×
1161

1162
    @Override
1163
    public void sendMessage(Object message) {}
×
1164

1165
    // Always returns {@code false}, since this is only used when the startup of the call fails.
1166
    @Override
1167
    public boolean isReady() {
1168
      return false;
×
1169
    }
1170
  };
1171

1172
  /**
1173
   * Terminate the channel if termination conditions are met.
1174
   */
1175
  // Must be run from syncContext
1176
  private void maybeTerminateChannel() {
1177
    if (terminated) {
1✔
1178
      return;
×
1179
    }
1180
    if (shutdown.get() && subchannels.isEmpty()) {
1✔
1181
      channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
1182
      channelz.removeRootChannel(this);
1✔
1183
      executorPool.returnObject(executor);
1✔
1184
      balancerRpcExecutorHolder.release();
1✔
1185
      offloadExecutorHolder.release();
1✔
1186
      // Release the transport factory so that it can deallocate any resources.
1187
      transportFactory.close();
1✔
1188

1189
      terminated = true;
1✔
1190
      terminatedLatch.countDown();
1✔
1191
    }
1192
  }
1✔
1193

1194
  @Override
1195
  public ConnectivityState getState(boolean requestConnection) {
1196
    ConnectivityState savedChannelState = channelStateManager.getState();
1✔
1197
    if (requestConnection && savedChannelState == IDLE) {
1✔
1198
      final class RequestConnection implements Runnable {
1✔
1199
        @Override
1200
        public void run() {
1201
          exitIdleMode();
1✔
1202
          if (lbHelper != null) {
1✔
1203
            lbHelper.lb.requestConnection();
1✔
1204
          }
1205
        }
1✔
1206
      }
1207

1208
      syncContext.execute(new RequestConnection());
1✔
1209
    }
1210
    return savedChannelState;
1✔
1211
  }
1212

1213
  @Override
1214
  public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
1215
    final class NotifyStateChanged implements Runnable {
1✔
1216
      @Override
1217
      public void run() {
1218
        channelStateManager.notifyWhenStateChanged(callback, executor, source);
1✔
1219
      }
1✔
1220
    }
1221

1222
    syncContext.execute(new NotifyStateChanged());
1✔
1223
  }
1✔
1224

1225
  @Override
1226
  public void resetConnectBackoff() {
1227
    final class ResetConnectBackoff implements Runnable {
1✔
1228
      @Override
1229
      public void run() {
1230
        if (shutdown.get()) {
1✔
1231
          return;
1✔
1232
        }
1233
        if (nameResolverStarted) {
1✔
1234
          refreshNameResolution();
1✔
1235
        }
1236
        for (InternalSubchannel subchannel : subchannels) {
1✔
1237
          subchannel.resetConnectBackoff();
1✔
1238
        }
1✔
1239
      }
1✔
1240
    }
1241

1242
    syncContext.execute(new ResetConnectBackoff());
1✔
1243
  }
1✔
1244

1245
  @Override
1246
  public void enterIdle() {
1247
    final class PrepareToLoseNetworkRunnable implements Runnable {
1✔
1248
      @Override
1249
      public void run() {
1250
        if (shutdown.get() || lbHelper == null) {
1✔
1251
          return;
1✔
1252
        }
1253
        cancelIdleTimer(/* permanent= */ false);
1✔
1254
        enterIdleMode();
1✔
1255
      }
1✔
1256
    }
1257

1258
    syncContext.execute(new PrepareToLoseNetworkRunnable());
1✔
1259
  }
1✔
1260

1261
  /**
1262
   * A registry that prevents channel shutdown from killing existing retry attempts that are in
1263
   * backoff.
1264
   */
1265
  private final class UncommittedRetriableStreamsRegistry {
1✔
1266
    // TODO(zdapeng): This means we would acquire a lock for each new retry-able stream,
1267
    // it's worthwhile to look for a lock-free approach.
1268
    final Object lock = new Object();
1✔
1269

1270
    @GuardedBy("lock")
1✔
1271
    Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
1272

1273
    @GuardedBy("lock")
1274
    Status shutdownStatus;
1275

1276
    void onShutdown(Status reason) {
1277
      boolean shouldShutdownDelayedTransport = false;
1✔
1278
      synchronized (lock) {
1✔
1279
        if (shutdownStatus != null) {
1✔
1280
          return;
1✔
1281
        }
1282
        shutdownStatus = reason;
1✔
1283
        // Keep the delayedTransport open until there is no more uncommitted streams, b/c those
1284
        // retriable streams, which may be in backoff and not using any transport, are already
1285
        // started RPCs.
1286
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1287
          shouldShutdownDelayedTransport = true;
1✔
1288
        }
1289
      }
1✔
1290

1291
      if (shouldShutdownDelayedTransport) {
1✔
1292
        delayedTransport.shutdown(reason);
1✔
1293
      }
1294
    }
1✔
1295

1296
    void onShutdownNow(Status reason) {
1297
      onShutdown(reason);
1✔
1298
      Collection<ClientStream> streams;
1299

1300
      synchronized (lock) {
1✔
1301
        streams = new ArrayList<>(uncommittedRetriableStreams);
1✔
1302
      }
1✔
1303

1304
      for (ClientStream stream : streams) {
1✔
1305
        stream.cancel(reason);
1✔
1306
      }
1✔
1307
      delayedTransport.shutdownNow(reason);
1✔
1308
    }
1✔
1309

1310
    /**
1311
     * Registers a RetriableStream and return null if not shutdown, otherwise just returns the
1312
     * shutdown Status.
1313
     */
1314
    @Nullable
1315
    Status add(RetriableStream<?> retriableStream) {
1316
      synchronized (lock) {
1✔
1317
        if (shutdownStatus != null) {
1✔
1318
          return shutdownStatus;
1✔
1319
        }
1320
        uncommittedRetriableStreams.add(retriableStream);
1✔
1321
        return null;
1✔
1322
      }
1323
    }
1324

1325
    void remove(RetriableStream<?> retriableStream) {
1326
      Status shutdownStatusCopy = null;
1✔
1327

1328
      synchronized (lock) {
1✔
1329
        uncommittedRetriableStreams.remove(retriableStream);
1✔
1330
        if (uncommittedRetriableStreams.isEmpty()) {
1✔
1331
          shutdownStatusCopy = shutdownStatus;
1✔
1332
          // Because retriable transport is long-lived, we take this opportunity to down-size the
1333
          // hashmap.
1334
          uncommittedRetriableStreams = new HashSet<>();
1✔
1335
        }
1336
      }
1✔
1337

1338
      if (shutdownStatusCopy != null) {
1✔
1339
        delayedTransport.shutdown(shutdownStatusCopy);
1✔
1340
      }
1341
    }
1✔
1342
  }
1343

1344
  private final class LbHelperImpl extends LoadBalancer.Helper {
1✔
1345
    AutoConfiguredLoadBalancer lb;
1346

1347
    @Override
1348
    public AbstractSubchannel createSubchannel(CreateSubchannelArgs args) {
1349
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1350
      // No new subchannel should be created after load balancer has been shutdown.
1351
      checkState(!terminating, "Channel is being terminated");
1✔
1352
      return new SubchannelImpl(args);
1✔
1353
    }
1354

1355
    @Override
1356
    public void updateBalancingState(
1357
        final ConnectivityState newState, final SubchannelPicker newPicker) {
1358
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1359
      checkNotNull(newState, "newState");
1✔
1360
      checkNotNull(newPicker, "newPicker");
1✔
1361

1362
      if (LbHelperImpl.this != lbHelper || panicMode) {
1✔
1363
        return;
1✔
1364
      }
1365
      updateSubchannelPicker(newPicker);
1✔
1366
      // It's not appropriate to report SHUTDOWN state from lb.
1367
      // Ignore the case of newState == SHUTDOWN for now.
1368
      if (newState != SHUTDOWN) {
1✔
1369
        channelLogger.log(
1✔
1370
            ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
1371
        channelStateManager.gotoState(newState);
1✔
1372
      }
1373
    }
1✔
1374

1375
    @Override
1376
    public void refreshNameResolution() {
1377
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1378
      final class LoadBalancerRefreshNameResolution implements Runnable {
1✔
1379
        @Override
1380
        public void run() {
1381
          ManagedChannelImpl.this.refreshNameResolution();
1✔
1382
        }
1✔
1383
      }
1384

1385
      syncContext.execute(new LoadBalancerRefreshNameResolution());
1✔
1386
    }
1✔
1387

1388
    @Override
1389
    public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, String authority) {
1390
      return createOobChannel(Collections.singletonList(addressGroup), authority);
×
1391
    }
1392

1393
    @Override
1394
    public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
1395
        String authority) {
1396
      NameResolverRegistry nameResolverRegistry = new NameResolverRegistry();
1✔
1397
      OobNameResolverProvider resolverProvider =
1✔
1398
          new OobNameResolverProvider(authority, addressGroup, syncContext);
1399
      nameResolverRegistry.register(resolverProvider);
1✔
1400
      // We could use a hard-coded target, as the name resolver won't actually use this string.
1401
      // However, that would make debugging less clear, as we use the target to identify the
1402
      // channel.
1403
      String target;
1404
      try {
1405
        target = new URI("oob", "", "/" + authority, null, null).toString();
1✔
1406
      } catch (URISyntaxException ex) {
×
1407
        // Any special characters in the path will be percent encoded. So this should be impossible.
1408
        throw new AssertionError(ex);
×
1409
      }
1✔
1410
      ManagedChannel delegate = createResolvingOobChannelBuilder(
1✔
1411
          target, new DefaultChannelCreds(), nameResolverRegistry)
1412
          // TODO(zdapeng): executors should not outlive the parent channel.
1413
          .executor(balancerRpcExecutorHolder.getExecutor())
1✔
1414
          .idleTimeout(Integer.MAX_VALUE, TimeUnit.SECONDS)
1✔
1415
          .disableRetry()
1✔
1416
          .build();
1✔
1417
      return new OobChannel(delegate, resolverProvider);
1✔
1418
    }
1419

1420
    @Deprecated
1421
    @Override
1422
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target) {
1423
      return createResolvingOobChannelBuilder(target, new DefaultChannelCreds())
1✔
1424
          // Override authority to keep the old behavior.
1425
          // createResolvingOobChannelBuilder(String target) will be deleted soon.
1426
          .overrideAuthority(getAuthority());
1✔
1427
    }
1428

1429
    @Override
1430
    public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1431
        final String target, final ChannelCredentials channelCreds) {
1432
      return createResolvingOobChannelBuilder(target, channelCreds, nameResolverRegistry);
1✔
1433
    }
1434

1435
    // TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1436
    // TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1437
    private ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1438
        final String target, final ChannelCredentials channelCreds,
1439
        NameResolverRegistry nameResolverRegistry) {
1440
      checkNotNull(channelCreds, "channelCreds");
1✔
1441

1442
      final class ResolvingOobChannelBuilder
1443
          extends ForwardingChannelBuilder2<ResolvingOobChannelBuilder> {
1444
        final ManagedChannelBuilder<?> delegate;
1445

1446
        ResolvingOobChannelBuilder() {
1✔
1447
          final ClientTransportFactory transportFactory;
1448
          CallCredentials callCredentials;
1449
          if (channelCreds instanceof DefaultChannelCreds) {
1✔
1450
            transportFactory = originalTransportFactory;
1✔
1451
            callCredentials = null;
1✔
1452
          } else {
1453
            SwapChannelCredentialsResult swapResult =
1✔
1454
                originalTransportFactory.swapChannelCredentials(channelCreds);
1✔
1455
            if (swapResult == null) {
1✔
1456
              delegate = Grpc.newChannelBuilder(target, channelCreds);
×
1457
              return;
×
1458
            } else {
1459
              transportFactory = swapResult.transportFactory;
1✔
1460
              callCredentials = swapResult.callCredentials;
1✔
1461
            }
1462
          }
1463
          ClientTransportFactoryBuilder transportFactoryBuilder =
1✔
1464
              new ClientTransportFactoryBuilder() {
1✔
1465
                @Override
1466
                public ClientTransportFactory buildClientTransportFactory() {
1467
                  return transportFactory;
1✔
1468
                }
1469
              };
1470
          delegate = new ManagedChannelImplBuilder(
1✔
1471
              target,
1472
              channelCreds,
1473
              callCredentials,
1474
              transportFactoryBuilder,
1475
              new FixedPortProvider(nameResolverArgs.getDefaultPort()))
1✔
1476
              .nameResolverRegistry(nameResolverRegistry);
1✔
1477
        }
1✔
1478

1479
        @Override
1480
        protected ManagedChannelBuilder<?> delegate() {
1481
          return delegate;
1✔
1482
        }
1483
      }
1484

1485
      checkState(!terminated, "Channel is terminated");
1✔
1486

1487
      ResolvingOobChannelBuilder builder = new ResolvingOobChannelBuilder();
1✔
1488

1489
      return builder
1✔
1490
          // TODO(zdapeng): executors should not outlive the parent channel.
1491
          .executor(executor)
1✔
1492
          .offloadExecutor(offloadExecutorHolder.getExecutor())
1✔
1493
          .maxTraceEvents(maxTraceEvents)
1✔
1494
          .proxyDetector(nameResolverArgs.getProxyDetector())
1✔
1495
          .userAgent(userAgent);
1✔
1496
    }
1497

1498
    @Override
1499
    public ChannelCredentials getUnsafeChannelCredentials() {
1500
      if (originalChannelCreds == null) {
1✔
1501
        return new DefaultChannelCreds();
1✔
1502
      }
1503
      return originalChannelCreds;
×
1504
    }
1505

1506
    @Override
1507
    public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag) {
1508
      updateOobChannelAddresses(channel, Collections.singletonList(eag));
×
1509
    }
×
1510

1511
    @Override
1512
    public void updateOobChannelAddresses(ManagedChannel channel,
1513
        List<EquivalentAddressGroup> eag) {
1514
      checkArgument(channel instanceof OobChannel,
1✔
1515
          "channel must have been returned from createOobChannel");
1516
      ((OobChannel) channel).updateAddresses(eag);
1✔
1517
    }
1✔
1518

1519
    @Override
1520
    public String getAuthority() {
1521
      return ManagedChannelImpl.this.authority();
1✔
1522
    }
1523

1524
    @Override
1525
    public String getChannelTarget() {
1526
      return targetUri.toString();
1✔
1527
    }
1528

1529
    @Override
1530
    public SynchronizationContext getSynchronizationContext() {
1531
      return syncContext;
1✔
1532
    }
1533

1534
    @Override
1535
    public ScheduledExecutorService getScheduledExecutorService() {
1536
      return scheduledExecutor;
1✔
1537
    }
1538

1539
    @Override
1540
    public ChannelLogger getChannelLogger() {
1541
      return channelLogger;
1✔
1542
    }
1543

1544
    @Override
1545
    public NameResolver.Args getNameResolverArgs() {
1546
      return nameResolverArgs;
1✔
1547
    }
1548

1549
    @Override
1550
    public NameResolverRegistry getNameResolverRegistry() {
1551
      return nameResolverRegistry;
1✔
1552
    }
1553

1554
    @Override
1555
    public MetricRecorder getMetricRecorder() {
1556
      return metricRecorder;
1✔
1557
    }
1558

1559
    /**
1560
     * A placeholder for channel creds if user did not specify channel creds for the channel.
1561
     */
1562
    // TODO(zdapeng): get rid of this class and let all ChannelBuilders always provide a non-null
1563
    //     channel creds.
1564
    final class DefaultChannelCreds extends ChannelCredentials {
1✔
1565
      @Override
1566
      public ChannelCredentials withoutBearerTokens() {
1567
        return this;
×
1568
      }
1569
    }
1570
  }
1571

1572
  static final class OobChannel extends ForwardingManagedChannel {
1573
    private final OobNameResolverProvider resolverProvider;
1574

1575
    public OobChannel(ManagedChannel delegate, OobNameResolverProvider resolverProvider) {
1576
      super(delegate);
1✔
1577
      this.resolverProvider = checkNotNull(resolverProvider, "resolverProvider");
1✔
1578
    }
1✔
1579

1580
    public void updateAddresses(List<EquivalentAddressGroup> eags) {
1581
      resolverProvider.updateAddresses(eags);
1✔
1582
    }
1✔
1583
  }
1584

1585
  final class NameResolverListener extends NameResolver.Listener2 {
1586
    final LbHelperImpl helper;
1587
    final NameResolver resolver;
1588

1589
    NameResolverListener(LbHelperImpl helperImpl, NameResolver resolver) {
1✔
1590
      this.helper = checkNotNull(helperImpl, "helperImpl");
1✔
1591
      this.resolver = checkNotNull(resolver, "resolver");
1✔
1592
    }
1✔
1593

1594
    @Override
1595
    public void onResult(final ResolutionResult resolutionResult) {
1596
      syncContext.execute(() -> onResult2(resolutionResult));
×
1597
    }
×
1598

1599
    @SuppressWarnings("ReferenceEquality")
1600
    @Override
1601
    public Status onResult2(final ResolutionResult resolutionResult) {
1602
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1603
      if (ManagedChannelImpl.this.nameResolver != resolver) {
1✔
1604
        return Status.OK;
1✔
1605
      }
1606

1607
      StatusOr<List<EquivalentAddressGroup>> serversOrError =
1✔
1608
          resolutionResult.getAddressesOrError();
1✔
1609
      if (!serversOrError.hasValue()) {
1✔
1610
        handleErrorInSyncContext(serversOrError.getStatus());
1✔
1611
        return serversOrError.getStatus();
1✔
1612
      }
1613
      List<EquivalentAddressGroup> servers = serversOrError.getValue();
1✔
1614
      channelLogger.log(
1✔
1615
          ChannelLogLevel.DEBUG,
1616
          "Resolved address: {0}, config={1}",
1617
          servers,
1618
          resolutionResult.getAttributes());
1✔
1619

1620
      if (lastResolutionState != ResolutionState.SUCCESS) {
1✔
1621
        channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}",
1✔
1622
            servers);
1623
        lastResolutionState = ResolutionState.SUCCESS;
1✔
1624
      }
1625
      ConfigOrError configOrError = resolutionResult.getServiceConfig();
1✔
1626
      InternalConfigSelector resolvedConfigSelector =
1✔
1627
          resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
1✔
1628
      ManagedChannelServiceConfig validServiceConfig =
1629
          configOrError != null && configOrError.getConfig() != null
1✔
1630
              ? (ManagedChannelServiceConfig) configOrError.getConfig()
1✔
1631
              : null;
1✔
1632
      Status serviceConfigError = configOrError != null ? configOrError.getError() : null;
1✔
1633

1634
      ManagedChannelServiceConfig effectiveServiceConfig;
1635
      if (!lookUpServiceConfig) {
1✔
1636
        if (validServiceConfig != null) {
1✔
1637
          channelLogger.log(
1✔
1638
              ChannelLogLevel.INFO,
1639
              "Service config from name resolver discarded by channel settings");
1640
        }
1641
        effectiveServiceConfig =
1642
            defaultServiceConfig == null ? EMPTY_SERVICE_CONFIG : defaultServiceConfig;
1✔
1643
        if (resolvedConfigSelector != null) {
1✔
1644
          channelLogger.log(
1✔
1645
              ChannelLogLevel.INFO,
1646
              "Config selector from name resolver discarded by channel settings");
1647
        }
1648
        realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1649
      } else {
1650
        // Try to use config if returned from name resolver
1651
        // Otherwise, try to use the default config if available
1652
        if (validServiceConfig != null) {
1✔
1653
          effectiveServiceConfig = validServiceConfig;
1✔
1654
          if (resolvedConfigSelector != null) {
1✔
1655
            realChannel.updateConfigSelector(resolvedConfigSelector);
1✔
1656
            if (effectiveServiceConfig.getDefaultConfigSelector() != null) {
1✔
1657
              channelLogger.log(
×
1658
                  ChannelLogLevel.DEBUG,
1659
                  "Method configs in service config will be discarded due to presence of"
1660
                      + "config-selector");
1661
            }
1662
          } else {
1663
            realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1664
          }
1665
        } else if (defaultServiceConfig != null) {
1✔
1666
          effectiveServiceConfig = defaultServiceConfig;
1✔
1667
          realChannel.updateConfigSelector(effectiveServiceConfig.getDefaultConfigSelector());
1✔
1668
          channelLogger.log(
1✔
1669
              ChannelLogLevel.INFO,
1670
              "Received no service config, using default service config");
1671
        } else if (serviceConfigError != null) {
1✔
1672
          if (!serviceConfigUpdated) {
1✔
1673
            // First DNS lookup has invalid service config, and cannot fall back to default
1674
            channelLogger.log(
1✔
1675
                ChannelLogLevel.INFO,
1676
                "Fallback to error due to invalid first service config without default config");
1677
            // This error could be an "inappropriate" control plane error that should not bleed
1678
            // through to client code using gRPC. We let them flow through here to the LB as
1679
            // we later check for these error codes when investigating pick results in
1680
            // GrpcUtil.getTransportFromPickResult().
1681
            onError(configOrError.getError());
1✔
1682
            return configOrError.getError();
1✔
1683
          } else {
1684
            effectiveServiceConfig = lastServiceConfig;
1✔
1685
          }
1686
        } else {
1687
          effectiveServiceConfig = EMPTY_SERVICE_CONFIG;
1✔
1688
          realChannel.updateConfigSelector(null);
1✔
1689
        }
1690
        if (!effectiveServiceConfig.equals(lastServiceConfig)) {
1✔
1691
          channelLogger.log(
1✔
1692
              ChannelLogLevel.INFO,
1693
              "Service config changed{0}",
1694
              effectiveServiceConfig == EMPTY_SERVICE_CONFIG ? " to empty" : "");
1✔
1695
          lastServiceConfig = effectiveServiceConfig;
1✔
1696
          transportProvider.throttle = effectiveServiceConfig.getRetryThrottling();
1✔
1697
        }
1698

1699
        try {
1700
          // TODO(creamsoup): when `serversOrError` is empty and lastResolutionStateCopy == SUCCESS
1701
          //  and lbNeedAddress, it shouldn't call the handleServiceConfigUpdate. But,
1702
          //  lbNeedAddress is not deterministic
1703
          serviceConfigUpdated = true;
1✔
1704
        } catch (RuntimeException re) {
×
1705
          logger.log(
×
1706
              Level.WARNING,
1707
              "[" + getLogId() + "] Unexpected exception from parsing service config",
×
1708
              re);
1709
        }
1✔
1710
      }
1711

1712
      Attributes effectiveAttrs = resolutionResult.getAttributes();
1✔
1713
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1714
      if (NameResolverListener.this.helper == ManagedChannelImpl.this.lbHelper) {
1✔
1715
        Attributes.Builder attrBuilder =
1✔
1716
            effectiveAttrs.toBuilder().discard(InternalConfigSelector.KEY);
1✔
1717
        Map<String, ?> healthCheckingConfig =
1✔
1718
            effectiveServiceConfig.getHealthCheckingConfig();
1✔
1719
        if (healthCheckingConfig != null) {
1✔
1720
          attrBuilder
1✔
1721
              .set(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG, healthCheckingConfig)
1✔
1722
              .build();
1✔
1723
        }
1724
        Attributes attributes = attrBuilder.build();
1✔
1725

1726
        ResolvedAddresses.Builder resolvedAddresses = ResolvedAddresses.newBuilder()
1✔
1727
            .setAddresses(serversOrError.getValue())
1✔
1728
            .setAttributes(attributes)
1✔
1729
            .setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig());
1✔
1730
        Status addressAcceptanceStatus = helper.lb.tryAcceptResolvedAddresses(
1✔
1731
            resolvedAddresses.build());
1✔
1732
        return addressAcceptanceStatus;
1✔
1733
      }
1734
      return Status.OK;
×
1735
    }
1736

1737
    @Override
1738
    public void onError(final Status error) {
1739
      checkArgument(!error.isOk(), "the error status must not be OK");
1✔
1740
      final class NameResolverErrorHandler implements Runnable {
1✔
1741
        @Override
1742
        public void run() {
1743
          handleErrorInSyncContext(error);
1✔
1744
        }
1✔
1745
      }
1746

1747
      syncContext.execute(new NameResolverErrorHandler());
1✔
1748
    }
1✔
1749

1750
    private void handleErrorInSyncContext(Status error) {
1751
      logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
1✔
1752
          new Object[] {getLogId(), error});
1✔
1753
      realChannel.onConfigError();
1✔
1754
      if (lastResolutionState != ResolutionState.ERROR) {
1✔
1755
        channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
1✔
1756
        lastResolutionState = ResolutionState.ERROR;
1✔
1757
      }
1758
      // Call LB only if it's not shutdown.  If LB is shutdown, lbHelper won't match.
1759
      if (NameResolverListener.this.helper != ManagedChannelImpl.this.lbHelper) {
1✔
1760
        return;
1✔
1761
      }
1762

1763
      helper.lb.handleNameResolutionError(error);
1✔
1764
    }
1✔
1765
  }
1766

1767
  private final class SubchannelImpl extends AbstractSubchannel {
1768
    final CreateSubchannelArgs args;
1769
    final InternalLogId subchannelLogId;
1770
    final ChannelLoggerImpl subchannelLogger;
1771
    final ChannelTracer subchannelTracer;
1772
    List<EquivalentAddressGroup> addressGroups;
1773
    InternalSubchannel subchannel;
1774
    boolean started;
1775
    boolean shutdown;
1776
    ScheduledHandle delayedShutdownTask;
1777

1778
    SubchannelImpl(CreateSubchannelArgs args) {
1✔
1779
      checkNotNull(args, "args");
1✔
1780
      addressGroups = args.getAddresses();
1✔
1781
      if (authorityOverride != null) {
1✔
1782
        List<EquivalentAddressGroup> eagsWithoutOverrideAttr =
1✔
1783
            stripOverrideAuthorityAttributes(args.getAddresses());
1✔
1784
        args = args.toBuilder().setAddresses(eagsWithoutOverrideAttr).build();
1✔
1785
      }
1786
      this.args = args;
1✔
1787
      subchannelLogId = InternalLogId.allocate("Subchannel", /*details=*/ authority());
1✔
1788
      subchannelTracer = new ChannelTracer(
1✔
1789
          subchannelLogId, maxTraceEvents, timeProvider.currentTimeNanos(),
1✔
1790
          "Subchannel for " + args.getAddresses());
1✔
1791
      subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1✔
1792
    }
1✔
1793

1794
    @Override
1795
    public void start(final SubchannelStateListener listener) {
1796
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1797
      checkState(!started, "already started");
1✔
1798
      checkState(!shutdown, "already shutdown");
1✔
1799
      checkState(!terminating, "Channel is being terminated");
1✔
1800
      started = true;
1✔
1801
      final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
1✔
1802
        // All callbacks are run in syncContext
1803
        @Override
1804
        void onTerminated(InternalSubchannel is) {
1805
          subchannels.remove(is);
1✔
1806
          channelz.removeSubchannel(is);
1✔
1807
          maybeTerminateChannel();
1✔
1808
        }
1✔
1809

1810
        @Override
1811
        void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1812
          checkState(listener != null, "listener is null");
1✔
1813
          listener.onSubchannelState(newState);
1✔
1814
        }
1✔
1815

1816
        @Override
1817
        void onInUse(InternalSubchannel is) {
1818
          inUseStateAggregator.updateObjectInUse(is, true);
1✔
1819
        }
1✔
1820

1821
        @Override
1822
        void onNotInUse(InternalSubchannel is) {
1823
          inUseStateAggregator.updateObjectInUse(is, false);
1✔
1824
        }
1✔
1825
      }
1826

1827
      final InternalSubchannel internalSubchannel = new InternalSubchannel(
1✔
1828
          args,
1829
          authority(),
1✔
1830
          userAgent,
1✔
1831
          backoffPolicyProvider,
1✔
1832
          transportFactory,
1✔
1833
          transportFactory.getScheduledExecutorService(),
1✔
1834
          stopwatchSupplier,
1✔
1835
          syncContext,
1836
          new ManagedInternalSubchannelCallback(),
1837
          channelz,
1✔
1838
          callTracerFactory.create(),
1✔
1839
          subchannelTracer,
1840
          subchannelLogId,
1841
          subchannelLogger,
1842
          transportFilters, target,
1✔
1843
          lbHelper.getMetricRecorder());
1✔
1844

1845
      channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1✔
1846
          .setDescription("Child Subchannel started")
1✔
1847
          .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
1848
          .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
1849
          .setSubchannelRef(internalSubchannel)
1✔
1850
          .build());
1✔
1851

1852
      this.subchannel = internalSubchannel;
1✔
1853
      channelz.addSubchannel(internalSubchannel);
1✔
1854
      subchannels.add(internalSubchannel);
1✔
1855
    }
1✔
1856

1857
    @Override
1858
    InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
1859
      checkState(started, "not started");
1✔
1860
      return subchannel;
1✔
1861
    }
1862

1863
    @Override
1864
    public void shutdown() {
1865
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1866
      if (subchannel == null) {
1✔
1867
        // start() was not successful
1868
        shutdown = true;
×
1869
        return;
×
1870
      }
1871
      if (shutdown) {
1✔
1872
        if (terminating && delayedShutdownTask != null) {
1✔
1873
          // shutdown() was previously called when terminating == false, thus a delayed shutdown()
1874
          // was scheduled.  Now since terminating == true, We should expedite the shutdown.
1875
          delayedShutdownTask.cancel();
×
1876
          delayedShutdownTask = null;
×
1877
          // Will fall through to the subchannel.shutdown() at the end.
1878
        } else {
1879
          return;
1✔
1880
        }
1881
      } else {
1882
        shutdown = true;
1✔
1883
      }
1884
      // Add a delay to shutdown to deal with the race between 1) a transport being picked and
1885
      // newStream() being called on it, and 2) its Subchannel is shut down by LoadBalancer (e.g.,
1886
      // because of address change, or because LoadBalancer is shutdown by Channel entering idle
1887
      // mode). If (2) wins, the app will see a spurious error. We work around this by delaying
1888
      // shutdown of Subchannel for a few seconds here.
1889
      //
1890
      // TODO(zhangkun83): consider a better approach
1891
      // (https://github.com/grpc/grpc-java/issues/2562).
1892
      if (!terminating) {
1✔
1893
        final class ShutdownSubchannel implements Runnable {
1✔
1894
          @Override
1895
          public void run() {
1896
            subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
1✔
1897
          }
1✔
1898
        }
1899

1900
        delayedShutdownTask = syncContext.schedule(
1✔
1901
            new LogExceptionRunnable(new ShutdownSubchannel()),
1902
            SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS,
1903
            transportFactory.getScheduledExecutorService());
1✔
1904
        return;
1✔
1905
      }
1906
      // When terminating == true, no more real streams will be created. It's safe and also
1907
      // desirable to shutdown timely.
1908
      subchannel.shutdown(SHUTDOWN_STATUS);
1✔
1909
    }
1✔
1910

1911
    @Override
1912
    public void requestConnection() {
1913
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1914
      checkState(started, "not started");
1✔
1915
      if (shutdown) {
1✔
1916
        return;
1✔
1917
      }
1918
      subchannel.obtainActiveTransport();
1✔
1919
    }
1✔
1920

1921
    @Override
1922
    public List<EquivalentAddressGroup> getAllAddresses() {
1923
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1924
      checkState(started, "not started");
1✔
1925
      return addressGroups;
1✔
1926
    }
1927

1928
    @Override
1929
    public Attributes getAttributes() {
1930
      return args.getAttributes();
1✔
1931
    }
1932

1933
    @Override
1934
    public String toString() {
1935
      return subchannelLogId.toString();
1✔
1936
    }
1937

1938
    @Override
1939
    public Channel asChannel() {
1940
      checkState(started, "not started");
1✔
1941
      return new SubchannelChannel(
1✔
1942
          subchannel, balancerRpcExecutorHolder.getExecutor(),
1✔
1943
          transportFactory.getScheduledExecutorService(),
1✔
1944
          callTracerFactory.create(),
1✔
1945
          new AtomicReference<InternalConfigSelector>(null));
1946
    }
1947

1948
    @Override
1949
    public Object getInternalSubchannel() {
1950
      checkState(started, "Subchannel is not started");
1✔
1951
      return subchannel;
1✔
1952
    }
1953

1954
    @Override
1955
    public ChannelLogger getChannelLogger() {
1956
      return subchannelLogger;
1✔
1957
    }
1958

1959
    @Override
1960
    public void updateAddresses(List<EquivalentAddressGroup> addrs) {
1961
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1962
      addressGroups = addrs;
1✔
1963
      if (authorityOverride != null) {
1✔
1964
        addrs = stripOverrideAuthorityAttributes(addrs);
1✔
1965
      }
1966
      subchannel.updateAddresses(addrs);
1✔
1967
    }
1✔
1968

1969
    @Override
1970
    public Attributes getConnectedAddressAttributes() {
1971
      return subchannel.getConnectedAddressAttributes();
1✔
1972
    }
1973

1974
    private List<EquivalentAddressGroup> stripOverrideAuthorityAttributes(
1975
        List<EquivalentAddressGroup> eags) {
1976
      List<EquivalentAddressGroup> eagsWithoutOverrideAttr = new ArrayList<>();
1✔
1977
      for (EquivalentAddressGroup eag : eags) {
1✔
1978
        EquivalentAddressGroup eagWithoutOverrideAttr = new EquivalentAddressGroup(
1✔
1979
            eag.getAddresses(),
1✔
1980
            eag.getAttributes().toBuilder().discard(ATTR_AUTHORITY_OVERRIDE).build());
1✔
1981
        eagsWithoutOverrideAttr.add(eagWithoutOverrideAttr);
1✔
1982
      }
1✔
1983
      return Collections.unmodifiableList(eagsWithoutOverrideAttr);
1✔
1984
    }
1985
  }
1986

1987
  @Override
1988
  public String toString() {
1989
    return MoreObjects.toStringHelper(this)
1✔
1990
        .add("logId", logId.getId())
1✔
1991
        .add("target", target)
1✔
1992
        .toString();
1✔
1993
  }
1994

1995
  /**
1996
   * Called from syncContext.
1997
   */
1998
  private final class DelayedTransportListener implements ManagedClientTransport.Listener {
1✔
1999
    @Override
2000
    public void transportShutdown(Status s, DisconnectError e) {
2001
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2002
    }
1✔
2003

2004
    @Override
2005
    public void transportReady() {
2006
      // Don't care
2007
    }
×
2008

2009
    @Override
2010
    public Attributes filterTransport(Attributes attributes) {
2011
      return attributes;
×
2012
    }
2013

2014
    @Override
2015
    public void transportInUse(final boolean inUse) {
2016
      inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
1✔
2017
      if (inUse) {
1✔
2018
        // It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
2019
        // subchannels is in use. But we should never be in idle mode when delayed transport is in
2020
        // use.
2021
        exitIdleMode();
1✔
2022
      }
2023
    }
1✔
2024

2025
    @Override
2026
    public void transportTerminated() {
2027
      checkState(shutdown.get(), "Channel must have been shut down");
1✔
2028
      terminating = true;
1✔
2029
      shutdownNameResolverAndLoadBalancer(false);
1✔
2030
      // No need to call channelStateManager since we are already in SHUTDOWN state.
2031
      // Until LoadBalancer is shutdown, it may still create new subchannels.  We catch them
2032
      // here.
2033
      maybeShutdownNowSubchannels();
1✔
2034
      maybeTerminateChannel();
1✔
2035
    }
1✔
2036
  }
2037

2038
  /**
2039
   * Must be accessed from syncContext.
2040
   */
2041
  private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
1✔
2042
    @Override
2043
    protected void handleInUse() {
2044
      exitIdleMode();
1✔
2045
    }
1✔
2046

2047
    @Override
2048
    protected void handleNotInUse() {
2049
      if (shutdown.get()) {
1✔
2050
        return;
1✔
2051
      }
2052
      rescheduleIdleTimer();
1✔
2053
    }
1✔
2054
  }
2055

2056
  /**
2057
   * Lazily request for Executor from an executor pool.
2058
   * Also act as an Executor directly to simply run a cmd
2059
   */
2060
  @VisibleForTesting
2061
  static final class ExecutorHolder implements Executor {
2062
    private final ObjectPool<? extends Executor> pool;
2063
    private Executor executor;
2064

2065
    ExecutorHolder(ObjectPool<? extends Executor> executorPool) {
1✔
2066
      this.pool = checkNotNull(executorPool, "executorPool");
1✔
2067
    }
1✔
2068

2069
    synchronized Executor getExecutor() {
2070
      if (executor == null) {
1✔
2071
        executor = checkNotNull(pool.getObject(), "%s.getObject()", executor);
1✔
2072
      }
2073
      return executor;
1✔
2074
    }
2075

2076
    synchronized void release() {
2077
      if (executor != null) {
1✔
2078
        executor = pool.returnObject(executor);
1✔
2079
      }
2080
    }
1✔
2081

2082
    @Override
2083
    public void execute(Runnable command) {
2084
      getExecutor().execute(command);
1✔
2085
    }
1✔
2086
  }
2087

2088
  private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
2089
    final ScheduledExecutorService delegate;
2090

2091
    private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
1✔
2092
      this.delegate = checkNotNull(delegate, "delegate");
1✔
2093
    }
1✔
2094

2095
    @Override
2096
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
2097
      return delegate.schedule(callable, delay, unit);
×
2098
    }
2099

2100
    @Override
2101
    public ScheduledFuture<?> schedule(Runnable cmd, long delay, TimeUnit unit) {
2102
      return delegate.schedule(cmd, delay, unit);
1✔
2103
    }
2104

2105
    @Override
2106
    public ScheduledFuture<?> scheduleAtFixedRate(
2107
        Runnable command, long initialDelay, long period, TimeUnit unit) {
2108
      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
1✔
2109
    }
2110

2111
    @Override
2112
    public ScheduledFuture<?> scheduleWithFixedDelay(
2113
        Runnable command, long initialDelay, long delay, TimeUnit unit) {
2114
      return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
×
2115
    }
2116

2117
    @Override
2118
    public boolean awaitTermination(long timeout, TimeUnit unit)
2119
        throws InterruptedException {
2120
      return delegate.awaitTermination(timeout, unit);
×
2121
    }
2122

2123
    @Override
2124
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
2125
        throws InterruptedException {
2126
      return delegate.invokeAll(tasks);
×
2127
    }
2128

2129
    @Override
2130
    public <T> List<Future<T>> invokeAll(
2131
        Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2132
        throws InterruptedException {
2133
      return delegate.invokeAll(tasks, timeout, unit);
×
2134
    }
2135

2136
    @Override
2137
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2138
        throws InterruptedException, ExecutionException {
2139
      return delegate.invokeAny(tasks);
×
2140
    }
2141

2142
    @Override
2143
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
2144
        throws InterruptedException, ExecutionException, TimeoutException {
2145
      return delegate.invokeAny(tasks, timeout, unit);
×
2146
    }
2147

2148
    @Override
2149
    public boolean isShutdown() {
2150
      return delegate.isShutdown();
×
2151
    }
2152

2153
    @Override
2154
    public boolean isTerminated() {
2155
      return delegate.isTerminated();
×
2156
    }
2157

2158
    @Override
2159
    public void shutdown() {
2160
      throw new UnsupportedOperationException("Restricted: shutdown() is not allowed");
1✔
2161
    }
2162

2163
    @Override
2164
    public List<Runnable> shutdownNow() {
2165
      throw new UnsupportedOperationException("Restricted: shutdownNow() is not allowed");
1✔
2166
    }
2167

2168
    @Override
2169
    public <T> Future<T> submit(Callable<T> task) {
2170
      return delegate.submit(task);
×
2171
    }
2172

2173
    @Override
2174
    public Future<?> submit(Runnable task) {
2175
      return delegate.submit(task);
×
2176
    }
2177

2178
    @Override
2179
    public <T> Future<T> submit(Runnable task, T result) {
2180
      return delegate.submit(task, result);
×
2181
    }
2182

2183
    @Override
2184
    public void execute(Runnable command) {
2185
      delegate.execute(command);
×
2186
    }
×
2187
  }
2188

2189
  /**
2190
   * A ResolutionState indicates the status of last name resolution.
2191
   */
2192
  enum ResolutionState {
1✔
2193
    NO_RESOLUTION,
1✔
2194
    SUCCESS,
1✔
2195
    ERROR
1✔
2196
  }
2197
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc