• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20136

06 Jan 2026 05:27AM UTC coverage: 88.693% (+0.01%) from 88.681%
#20136

push

github

web-flow
core: Implement oobChannel with resolvingOobChannel

The most important part of this change is to ensure that CallCredentials
are not propagated to the OOB channel. Because the authority of the OOB
channel doesn't match the parent channel, we must ensure that any bearer
tokens are not sent to the different server. However, this was not a
problem because resolvingOobChannel has the same constraint. (RLS has a
different constraint, but we were able to let RLS manage that itself.)

This commit does change the behavior of channelz, shutdown, and metrics
for the OOB channel. Previously the OOB channel was registered with
channelz, but it is only a TODO for resolving channel. Channel shutdown
no longer shuts down the OOB channel and it no longer waits for the OOB
channel to terminate before becoming terminated itself. That is also a
pre-existing TODO. Since ManagedChannelImplBuilder is now being used,
global configurators and census are enabled. The proper behavior here is
still being determined, but we would want it to be the same for
resolving OOB channel and OOB channel.

The OOB channel used to refresh the name resolution when the subchannel
went IDLE or TF. That is an older behavior from back when regular
subchannels would also cause the name resolver to refresh. Now-a-days
that goes though the LB tree. gRPC-LB already refreshes name resolution
when its RPC closes, so no longer doing it automatically should be fine.

balancerRpcExecutorPool no longer has its lifetime managed by the child.
It'd be easiest to not use it at all from OOB channel, which wouldn't
actually change the regular behavior, as channels already use the same
executor by default. However, the tests are making use of the executor
being injected, so some propagation needs to be preserved.

Lots of OOB channel tests were deleted, but these were either testing
OobChannel, which is now gone, or things like channelz, which are known
to no longer work like before.

35361 of 39869 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.25
/../core/src/main/java/io/grpc/internal/InternalSubchannel.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static io.grpc.ConnectivityState.CONNECTING;
20
import static io.grpc.ConnectivityState.IDLE;
21
import static io.grpc.ConnectivityState.READY;
22
import static io.grpc.ConnectivityState.SHUTDOWN;
23
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.MoreObjects;
27
import com.google.common.base.Preconditions;
28
import com.google.common.base.Stopwatch;
29
import com.google.common.base.Supplier;
30
import com.google.common.util.concurrent.ListenableFuture;
31
import com.google.common.util.concurrent.SettableFuture;
32
import com.google.errorprone.annotations.ForOverride;
33
import io.grpc.Attributes;
34
import io.grpc.CallOptions;
35
import io.grpc.ChannelLogger;
36
import io.grpc.ChannelLogger.ChannelLogLevel;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.ClientTransportFilter;
39
import io.grpc.ConnectivityState;
40
import io.grpc.ConnectivityStateInfo;
41
import io.grpc.EquivalentAddressGroup;
42
import io.grpc.HttpConnectProxiedSocketAddress;
43
import io.grpc.InternalChannelz;
44
import io.grpc.InternalChannelz.ChannelStats;
45
import io.grpc.InternalInstrumented;
46
import io.grpc.InternalLogId;
47
import io.grpc.InternalWithLogId;
48
import io.grpc.LoadBalancer;
49
import io.grpc.Metadata;
50
import io.grpc.MethodDescriptor;
51
import io.grpc.MetricRecorder;
52
import io.grpc.NameResolver;
53
import io.grpc.SecurityLevel;
54
import io.grpc.Status;
55
import io.grpc.SynchronizationContext;
56
import io.grpc.SynchronizationContext.ScheduledHandle;
57
import java.net.SocketAddress;
58
import java.util.ArrayList;
59
import java.util.Collection;
60
import java.util.Collections;
61
import java.util.List;
62
import java.util.concurrent.ScheduledExecutorService;
63
import java.util.concurrent.TimeUnit;
64
import javax.annotation.Nullable;
65
import javax.annotation.concurrent.ThreadSafe;
66

67
/**
68
 * Transports for a single {@link SocketAddress}.
69
 */
70
@ThreadSafe
71
final class InternalSubchannel implements InternalInstrumented<ChannelStats>, TransportProvider {
72

73
  private final InternalLogId logId;
74
  private final String authority;
75
  private final String userAgent;
76
  private final BackoffPolicy.Provider backoffPolicyProvider;
77
  private final Callback callback;
78
  private final ClientTransportFactory transportFactory;
79
  private final ScheduledExecutorService scheduledExecutor;
80
  private final InternalChannelz channelz;
81
  private final CallTracer callsTracer;
82
  private final ChannelTracer channelTracer;
83
  private final ChannelLogger channelLogger;
84
  private final boolean reconnectDisabled;
85

86
  private final List<ClientTransportFilter> transportFilters;
87

88
  /**
89
   * All field must be mutated in the syncContext.
90
   */
91
  private final SynchronizationContext syncContext;
92

93
  /**
94
   * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if
95
   * both are null.
96
   *
97
   * <p>Note: any {@link Index#updateAddresses(List)} should also update {@link #addressGroups}.
98
   */
99
  private final Index addressIndex;
100

101
  /**
102
   * A volatile accessor to {@link Index#getAddressGroups()}. There are few methods ({@link
103
   * #getAddressGroups()} and {@link #toString()} access this value where they supposed to access
104
   * in the {@link #syncContext}. Ideally {@link Index#getAddressGroups()} can be volatile, so we
105
   * don't need to maintain this volatile accessor. Although, having this accessor can reduce
106
   * unnecessary volatile reads while it delivers clearer intention of why .
107
   */
108
  private volatile List<EquivalentAddressGroup> addressGroups;
109

110
  /**
111
   * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is
112
   * scheduled.
113
   */
114
  private BackoffPolicy reconnectPolicy;
115

116
  /**
117
   * Timer monitoring duration since entering CONNECTING state.
118
   */
119
  private final Stopwatch connectingTimer;
120

121
  @Nullable
122
  private ScheduledHandle reconnectTask;
123
  @Nullable
124
  private ScheduledHandle shutdownDueToUpdateTask;
125
  @Nullable
126
  private ManagedClientTransport shutdownDueToUpdateTransport;
127

128
  /**
129
   * All transports that are not terminated. At the very least the value of {@link #activeTransport}
130
   * will be present, but previously used transports that still have streams or are stopping may
131
   * also be present.
132
   */
133
  private final Collection<ConnectionClientTransport> transports = new ArrayList<>();
1✔
134

135
  // Must only be used from syncContext
136
  private final InUseStateAggregator<ConnectionClientTransport> inUseStateAggregator =
1✔
137
      new InUseStateAggregator<ConnectionClientTransport>() {
1✔
138
        @Override
139
        protected void handleInUse() {
140
          callback.onInUse(InternalSubchannel.this);
1✔
141
        }
1✔
142

143
        @Override
144
        protected void handleNotInUse() {
145
          callback.onNotInUse(InternalSubchannel.this);
1✔
146
        }
1✔
147
      };
148

149
  /**
150
   * The to-be active transport, which is not ready yet.
151
   */
152
  @Nullable
153
  private ConnectionClientTransport pendingTransport;
154

155
  /**
156
   * The transport for new outgoing requests. Non-null only in READY state.
157
   */
158
  @Nullable
159
  private volatile ManagedClientTransport activeTransport;
160

161
  private volatile ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
1✔
162

163
  private Status shutdownReason;
164

165
  private volatile Attributes connectedAddressAttributes;
166
  private final SubchannelMetrics subchannelMetrics;
167
  private final String target;
168

169
  InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
170
                     BackoffPolicy.Provider backoffPolicyProvider,
171
                     ClientTransportFactory transportFactory,
172
                     ScheduledExecutorService scheduledExecutor,
173
                     Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext,
174
                     Callback callback, InternalChannelz channelz, CallTracer callsTracer,
175
                     ChannelTracer channelTracer, InternalLogId logId,
176
                     ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters,
177
                     String target,
178
                     MetricRecorder metricRecorder) {
1✔
179
    List<EquivalentAddressGroup> addressGroups = args.getAddresses();
1✔
180
    Preconditions.checkNotNull(addressGroups, "addressGroups");
1✔
181
    Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
1✔
182
    checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
1✔
183
    List<EquivalentAddressGroup> unmodifiableAddressGroups =
1✔
184
        Collections.unmodifiableList(new ArrayList<>(addressGroups));
1✔
185
    this.addressGroups = unmodifiableAddressGroups;
1✔
186
    this.addressIndex = new Index(unmodifiableAddressGroups);
1✔
187
    this.authority = authority;
1✔
188
    this.userAgent = userAgent;
1✔
189
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
190
    this.transportFactory = transportFactory;
1✔
191
    this.scheduledExecutor = scheduledExecutor;
1✔
192
    this.connectingTimer = stopwatchSupplier.get();
1✔
193
    this.syncContext = syncContext;
1✔
194
    this.callback = callback;
1✔
195
    this.channelz = channelz;
1✔
196
    this.callsTracer = callsTracer;
1✔
197
    this.channelTracer = Preconditions.checkNotNull(channelTracer, "channelTracer");
1✔
198
    this.logId = Preconditions.checkNotNull(logId, "logId");
1✔
199
    this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
1✔
200
    this.transportFilters = transportFilters;
1✔
201
    this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
1✔
202
    this.target = target;
1✔
203
    this.subchannelMetrics = new SubchannelMetrics(metricRecorder);
1✔
204
  }
1✔
205

206
  ChannelLogger getChannelLogger() {
207
    return channelLogger;
1✔
208
  }
209

210
  @Override
211
  public ClientTransport obtainActiveTransport() {
212
    ClientTransport savedTransport = activeTransport;
1✔
213
    if (savedTransport != null) {
1✔
214
      return savedTransport;
1✔
215
    }
216
    syncContext.execute(new Runnable() {
1✔
217
      @Override
218
      public void run() {
219
        if (state.getState() == IDLE) {
1✔
220
          channelLogger.log(ChannelLogLevel.INFO, "CONNECTING as requested");
1✔
221
          gotoNonErrorState(CONNECTING);
1✔
222
          startNewTransport();
1✔
223
        }
224
      }
1✔
225
    });
226
    return null;
1✔
227
  }
228

229
  /**
230
   * Returns a READY transport if there is any, without trying to connect.
231
   */
232
  @Nullable
233
  ClientTransport getTransport() {
234
    return activeTransport;
1✔
235
  }
236

237
  /**
238
   * Returns the authority string associated with this Subchannel.
239
   */
240
  String getAuthority() {
241
    return authority;
×
242
  }
243

244
  private void startNewTransport() {
245
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
246

247
    Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
1✔
248

249
    if (addressIndex.isAtBeginning()) {
1✔
250
      connectingTimer.reset().start();
1✔
251
    }
252
    SocketAddress address = addressIndex.getCurrentAddress();
1✔
253

254
    HttpConnectProxiedSocketAddress proxiedAddr = null;
1✔
255
    if (address instanceof HttpConnectProxiedSocketAddress) {
1✔
256
      proxiedAddr = (HttpConnectProxiedSocketAddress) address;
×
257
      address = proxiedAddr.getTargetAddress();
×
258
    }
259

260
    Attributes currentEagAttributes = addressIndex.getCurrentEagAttributes();
1✔
261
    String eagChannelAuthority = currentEagAttributes
1✔
262
            .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE);
1✔
263
    ClientTransportFactory.ClientTransportOptions options =
1✔
264
        new ClientTransportFactory.ClientTransportOptions()
265
          .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
1✔
266
          .setEagAttributes(currentEagAttributes)
1✔
267
          .setUserAgent(userAgent)
1✔
268
          .setHttpConnectProxiedSocketAddress(proxiedAddr);
1✔
269
    TransportLogger transportLogger = new TransportLogger();
1✔
270
    // In case the transport logs in the constructor, use the subchannel logId
271
    transportLogger.logId = getLogId();
1✔
272
    ConnectionClientTransport transport =
1✔
273
        new CallTracingTransport(
274
            transportFactory
275
                .newClientTransport(address, options, transportLogger), callsTracer);
1✔
276
    transportLogger.logId = transport.getLogId();
1✔
277
    channelz.addClientSocket(transport);
1✔
278
    pendingTransport = transport;
1✔
279
    transports.add(transport);
1✔
280
    Runnable runnable = transport.start(new TransportListener(transport));
1✔
281
    if (runnable != null) {
1✔
282
      syncContext.executeLater(runnable);
1✔
283
    }
284
    channelLogger.log(ChannelLogLevel.INFO, "Started transport {0}", transportLogger.logId);
1✔
285
  }
1✔
286

287
  /**
288
   * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
289
   * @param status the causal status when the channel begins transition to
290
   *     TRANSIENT_FAILURE.
291
   */
292
  private void scheduleBackoff(final Status status) {
293
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
294

295
    class EndOfCurrentBackoff implements Runnable {
1✔
296
      @Override
297
      public void run() {
298
        reconnectTask = null;
1✔
299
        channelLogger.log(ChannelLogLevel.INFO, "CONNECTING after backoff");
1✔
300
        gotoNonErrorState(CONNECTING);
1✔
301
        startNewTransport();
1✔
302
      }
1✔
303
    }
304

305
    gotoState(ConnectivityStateInfo.forTransientFailure(status));
1✔
306

307
    if (reconnectDisabled) {
1✔
308
      return;
1✔
309
    }
310

311
    if (reconnectPolicy == null) {
1✔
312
      reconnectPolicy = backoffPolicyProvider.get();
1✔
313
    }
314
    long delayNanos =
1✔
315
        reconnectPolicy.nextBackoffNanos() - connectingTimer.elapsed(TimeUnit.NANOSECONDS);
1✔
316
    channelLogger.log(
1✔
317
        ChannelLogLevel.INFO,
318
        "TRANSIENT_FAILURE ({0}). Will reconnect after {1} ns",
319
        printShortStatus(status), delayNanos);
1✔
320
    Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
1✔
321
    reconnectTask = syncContext.schedule(
1✔
322
        new EndOfCurrentBackoff(),
323
        delayNanos,
324
        TimeUnit.NANOSECONDS,
325
        scheduledExecutor);
326
  }
1✔
327

328
  /**
329
   * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise, this
330
   * method has no effect.
331
   */
332
  void resetConnectBackoff() {
333
    syncContext.execute(new Runnable() {
1✔
334
      @Override
335
      public void run() {
336
        if (state.getState() != TRANSIENT_FAILURE) {
1✔
337
          return;
1✔
338
        }
339
        cancelReconnectTask();
1✔
340
        channelLogger.log(ChannelLogLevel.INFO, "CONNECTING; backoff interrupted");
1✔
341
        gotoNonErrorState(CONNECTING);
1✔
342
        startNewTransport();
1✔
343
      }
1✔
344
    });
345
  }
1✔
346

347
  private void gotoNonErrorState(final ConnectivityState newState) {
348
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
349

350
    gotoState(ConnectivityStateInfo.forNonError(newState));
1✔
351
  }
1✔
352

353
  private void gotoState(final ConnectivityStateInfo newState) {
354
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
355

356
    if (state.getState() != newState.getState()) {
1✔
357
      Preconditions.checkState(state.getState() != SHUTDOWN,
1✔
358
          "Cannot transition out of SHUTDOWN to %s", newState.getState());
1✔
359
      if (reconnectDisabled && newState.getState() == TRANSIENT_FAILURE) {
1✔
360
        state = ConnectivityStateInfo.forNonError(IDLE);
1✔
361
      } else {
362
        state = newState;
1✔
363
      }
364
      callback.onStateChange(InternalSubchannel.this, newState);
1✔
365
    }
366
  }
1✔
367

368
  /** Replaces the existing addresses, avoiding unnecessary reconnects. */
369
  public void updateAddresses(final List<EquivalentAddressGroup> newAddressGroups) {
370
    Preconditions.checkNotNull(newAddressGroups, "newAddressGroups");
1✔
371
    checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry");
1✔
372
    Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty");
1✔
373
    final List<EquivalentAddressGroup> newImmutableAddressGroups =
1✔
374
        Collections.unmodifiableList(new ArrayList<>(newAddressGroups));
1✔
375

376
    syncContext.execute(new Runnable() {
1✔
377
      @Override
378
      public void run() {
379
        ManagedClientTransport savedTransport = null;
1✔
380
        SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
381
        addressIndex.updateGroups(newImmutableAddressGroups);
1✔
382
        addressGroups = newImmutableAddressGroups;
1✔
383
        if (state.getState() == READY || state.getState() == CONNECTING) {
1✔
384
          if (!addressIndex.seekTo(previousAddress)) {
1✔
385
            // Forced to drop the connection
386
            if (state.getState() == READY) {
1✔
387
              savedTransport = activeTransport;
1✔
388
              activeTransport = null;
1✔
389
              addressIndex.reset();
1✔
390
              gotoNonErrorState(IDLE);
1✔
391
            } else {
392
              pendingTransport.shutdown(
1✔
393
                  Status.UNAVAILABLE.withDescription(
1✔
394
                    "InternalSubchannel closed pending transport due to address change"));
395
              pendingTransport = null;
1✔
396
              addressIndex.reset();
1✔
397
              startNewTransport();
1✔
398
            }
399
          }
400
        }
401
        if (savedTransport != null) {
1✔
402
          if (shutdownDueToUpdateTask != null) {
1✔
403
            // Keeping track of multiple shutdown tasks adds complexity, and shouldn't generally be
404
            // necessary. This transport has probably already had plenty of time.
405
            shutdownDueToUpdateTransport.shutdown(
1✔
406
                Status.UNAVAILABLE.withDescription(
1✔
407
                    "InternalSubchannel closed transport early due to address change"));
408
            shutdownDueToUpdateTask.cancel();
1✔
409
            shutdownDueToUpdateTask = null;
1✔
410
            shutdownDueToUpdateTransport = null;
1✔
411
          }
412
          // Avoid needless RPC failures by delaying the shutdown. See
413
          // https://github.com/grpc/grpc-java/issues/2562
414
          shutdownDueToUpdateTransport = savedTransport;
1✔
415
          shutdownDueToUpdateTask = syncContext.schedule(
1✔
416
              new Runnable() {
1✔
417
                @Override public void run() {
418
                  ManagedClientTransport transport = shutdownDueToUpdateTransport;
1✔
419
                  shutdownDueToUpdateTask = null;
1✔
420
                  shutdownDueToUpdateTransport = null;
1✔
421
                  transport.shutdown(
1✔
422
                      Status.UNAVAILABLE.withDescription(
1✔
423
                          "InternalSubchannel closed transport due to address change"));
424
                }
1✔
425
              },
426
              ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS,
427
              TimeUnit.SECONDS,
428
              scheduledExecutor);
1✔
429
        }
430
      }
1✔
431
    });
432
  }
1✔
433

434
  public void shutdown(final Status reason) {
435
    syncContext.execute(new Runnable() {
1✔
436
      @Override
437
      public void run() {
438
        ManagedClientTransport savedActiveTransport;
439
        ConnectionClientTransport savedPendingTransport;
440
        if (state.getState() == SHUTDOWN) {
1✔
441
          return;
1✔
442
        }
443
        shutdownReason = reason;
1✔
444
        savedActiveTransport = activeTransport;
1✔
445
        savedPendingTransport = pendingTransport;
1✔
446
        activeTransport = null;
1✔
447
        pendingTransport = null;
1✔
448
        gotoNonErrorState(SHUTDOWN);
1✔
449
        addressIndex.reset();
1✔
450
        if (transports.isEmpty()) {
1✔
451
          handleTermination();
1✔
452
        }  // else: the callback will be run once all transports have been terminated
453
        cancelReconnectTask();
1✔
454
        if (shutdownDueToUpdateTask != null) {
1✔
455
          shutdownDueToUpdateTask.cancel();
1✔
456
          shutdownDueToUpdateTransport.shutdown(reason);
1✔
457
          shutdownDueToUpdateTask = null;
1✔
458
          shutdownDueToUpdateTransport = null;
1✔
459
        }
460
        if (savedActiveTransport != null) {
1✔
461
          savedActiveTransport.shutdown(reason);
1✔
462
        }
463
        if (savedPendingTransport != null) {
1✔
464
          savedPendingTransport.shutdown(reason);
1✔
465
        }
466
      }
1✔
467
    });
468
  }
1✔
469

470
  @Override
471
  public String toString() {
472
    // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock
473
    // since there may be many addresses.
474
    return MoreObjects.toStringHelper(this)
×
475
        .add("logId", logId.getId())
×
476
        .add("addressGroups", addressGroups)
×
477
        .toString();
×
478
  }
479

480
  private void handleTermination() {
481
    syncContext.execute(new Runnable() {
1✔
482
      @Override
483
      public void run() {
484
        channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
485
        callback.onTerminated(InternalSubchannel.this);
1✔
486
      }
1✔
487
    });
488
  }
1✔
489

490
  private void handleTransportInUseState(
491
      final ConnectionClientTransport transport, final boolean inUse) {
492
    syncContext.execute(new Runnable() {
1✔
493
      @Override
494
      public void run() {
495
        inUseStateAggregator.updateObjectInUse(transport, inUse);
1✔
496
      }
1✔
497
    });
498
  }
1✔
499

500
  void shutdownNow(final Status reason) {
501
    shutdown(reason);
1✔
502
    syncContext.execute(new Runnable() {
1✔
503
      @Override
504
      public void run() {
505
        Collection<ManagedClientTransport> transportsCopy =
1✔
506
            new ArrayList<ManagedClientTransport>(transports);
1✔
507

508
        for (ManagedClientTransport transport : transportsCopy) {
1✔
509
          transport.shutdownNow(reason);
1✔
510
        }
1✔
511
      }
1✔
512
    });
513
  }
1✔
514

515
  List<EquivalentAddressGroup> getAddressGroups() {
516
    return addressGroups;
1✔
517
  }
518

519
  private void cancelReconnectTask() {
520
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
521

522
    if (reconnectTask != null) {
1✔
523
      reconnectTask.cancel();
1✔
524
      reconnectTask = null;
1✔
525
      reconnectPolicy = null;
1✔
526
    }
527
  }
1✔
528

529
  @Override
530
  public InternalLogId getLogId() {
531
    return logId;
1✔
532
  }
533

534
  @Override
535
  public ListenableFuture<ChannelStats> getStats() {
536
    final SettableFuture<ChannelStats> channelStatsFuture = SettableFuture.create();
1✔
537
    syncContext.execute(new Runnable() {
1✔
538
      @Override
539
      public void run() {
540
        ChannelStats.Builder builder = new ChannelStats.Builder();
1✔
541
        List<EquivalentAddressGroup> addressGroupsSnapshot = addressIndex.getGroups();
1✔
542
        List<InternalWithLogId> transportsSnapshot = new ArrayList<InternalWithLogId>(transports);
1✔
543
        builder.setTarget(addressGroupsSnapshot.toString()).setState(getState());
1✔
544
        builder.setSockets(transportsSnapshot);
1✔
545
        callsTracer.updateBuilder(builder);
1✔
546
        channelTracer.updateBuilder(builder);
1✔
547
        channelStatsFuture.set(builder.build());
1✔
548
      }
1✔
549
    });
550
    return channelStatsFuture;
1✔
551
  }
552

553
  /**
554
   * Return attributes for server address connected by sub channel.
555
   */
556
  public Attributes getConnectedAddressAttributes() {
557
    return connectedAddressAttributes;
1✔
558
  }
559

560
  ConnectivityState getState() {
561
    return state.getState();
1✔
562
  }
563

564
  private static void checkListHasNoNulls(List<?> list, String msg) {
565
    for (Object item : list) {
1✔
566
      Preconditions.checkNotNull(item, msg);
1✔
567
    }
1✔
568
  }
1✔
569

570
  /** Listener for real transports. */
571
  private class TransportListener implements ManagedClientTransport.Listener {
572
    final ConnectionClientTransport transport;
573
    boolean shutdownInitiated = false;
1✔
574

575
    TransportListener(ConnectionClientTransport transport) {
1✔
576
      this.transport = transport;
1✔
577
    }
1✔
578

579
    @Override
580
    public Attributes filterTransport(Attributes attributes) {
581
      for (ClientTransportFilter filter : transportFilters) {
1✔
582
        attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
1✔
583
            "Filter %s returned null", filter);
584
      }
1✔
585
      return attributes;
1✔
586
    }
587

588
    @Override
589
    public void transportReady() {
590
      channelLogger.log(ChannelLogLevel.INFO, "READY");
1✔
591
      syncContext.execute(new Runnable() {
1✔
592
        @Override
593
        public void run() {
594
          reconnectPolicy = null;
1✔
595
          if (shutdownReason != null) {
1✔
596
            // activeTransport should have already been set to null by shutdown(). We keep it null.
597
            Preconditions.checkState(activeTransport == null,
1✔
598
                "Unexpected non-null activeTransport");
599
            transport.shutdown(shutdownReason);
1✔
600
          } else if (pendingTransport == transport) {
1✔
601
            activeTransport = transport;
1✔
602
            pendingTransport = null;
1✔
603
            connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
1✔
604
            gotoNonErrorState(READY);
1✔
605
            subchannelMetrics.recordConnectionAttemptSucceeded(/* target= */ target,
1✔
606
                /* backendService= */ getAttributeOrDefault(
1✔
607
                    addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
1✔
608
                /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
609
                    EquivalentAddressGroup.ATTR_LOCALITY_NAME),
610
                /* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
1✔
611
                    .get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
1✔
612
          }
613
        }
1✔
614
      });
615
    }
1✔
616

617
    @Override
618
    public void transportInUse(boolean inUse) {
619
      handleTransportInUseState(transport, inUse);
1✔
620
    }
1✔
621

622
    @Override
623
    public void transportShutdown(final Status s, final DisconnectError disconnectError) {
624
      channelLogger.log(
1✔
625
          ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
1✔
626
      shutdownInitiated = true;
1✔
627
      syncContext.execute(new Runnable() {
1✔
628
        @Override
629
        public void run() {
630
          if (state.getState() == SHUTDOWN) {
1✔
631
            return;
1✔
632
          }
633
          if (activeTransport == transport) {
1✔
634
            activeTransport = null;
1✔
635
            addressIndex.reset();
1✔
636
            gotoNonErrorState(IDLE);
1✔
637
            subchannelMetrics.recordDisconnection(/* target= */ target,
1✔
638
                /* backendService= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
639
                    NameResolver.ATTR_BACKEND_SERVICE),
640
                /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
641
                    EquivalentAddressGroup.ATTR_LOCALITY_NAME),
642
                /* disconnectError= */ disconnectError.toErrorString(),
1✔
643
                /* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
1✔
644
                    .get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
1✔
645
          } else if (pendingTransport == transport) {
1✔
646
            subchannelMetrics.recordConnectionAttemptFailed(/* target= */ target,
1✔
647
                /* backendService= */getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
648
                    NameResolver.ATTR_BACKEND_SERVICE),
649
                /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
650
                    EquivalentAddressGroup.ATTR_LOCALITY_NAME));
651
            Preconditions.checkState(state.getState() == CONNECTING,
1✔
652
                "Expected state is CONNECTING, actual state is %s", state.getState());
1✔
653
            addressIndex.increment();
1✔
654
            // Continue to reconnect if there are still addresses to try.
655
            if (!addressIndex.isValid()) {
1✔
656
              pendingTransport = null;
1✔
657
              addressIndex.reset();
1✔
658
              // Initiate backoff
659
              // Transition to TRANSIENT_FAILURE
660
              scheduleBackoff(s);
1✔
661
            } else {
662
              startNewTransport();
1✔
663
            }
664
          }
665
        }
1✔
666
      });
667
    }
1✔
668

669
    @Override
670
    public void transportTerminated() {
671
      Preconditions.checkState(
1✔
672
          shutdownInitiated, "transportShutdown() must be called before transportTerminated().");
673

674
      channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId());
1✔
675
      channelz.removeClientSocket(transport);
1✔
676
      handleTransportInUseState(transport, false);
1✔
677
      for (ClientTransportFilter filter : transportFilters) {
1✔
678
        filter.transportTerminated(transport.getAttributes());
1✔
679
      }
1✔
680
      syncContext.execute(new Runnable() {
1✔
681
        @Override
682
        public void run() {
683
          transports.remove(transport);
1✔
684
          if (state.getState() == SHUTDOWN && transports.isEmpty()) {
1✔
685
            handleTermination();
1✔
686
          }
687
        }
1✔
688
      });
689
    }
1✔
690

691
    private String extractSecurityLevel(SecurityLevel securityLevel) {
692
      if (securityLevel == null) {
1✔
693
        return "none";
1✔
694
      }
695
      switch (securityLevel) {
1✔
696
        case NONE:
697
          return "none";
×
698
        case INTEGRITY:
699
          return "integrity_only";
×
700
        case PRIVACY_AND_INTEGRITY:
701
          return "privacy_and_integrity";
1✔
702
        default:
703
          throw new IllegalArgumentException("Unknown SecurityLevel: " + securityLevel);
×
704
      }
705
    }
706

707
    private String getAttributeOrDefault(Attributes attributes, Attributes.Key<String> key) {
708
      String value = attributes.get(key);
1✔
709
      return value == null ? "" : value;
1✔
710
    }
711
  }
712

713
  // All methods are called in syncContext
714
  abstract static class Callback {
1✔
715
    /**
716
     * Called when the subchannel is terminated, which means it's shut down and all transports
717
     * have been terminated.
718
     */
719
    @ForOverride
720
    void onTerminated(InternalSubchannel is) { }
×
721

722
    /**
723
     * Called when the subchannel's connectivity state has changed.
724
     */
725
    @ForOverride
726
    void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { }
×
727

728
    /**
729
     * Called when the subchannel's in-use state has changed to true, which means at least one
730
     * transport is in use.
731
     */
732
    @ForOverride
733
    void onInUse(InternalSubchannel is) { }
×
734

735
    /**
736
     * Called when the subchannel's in-use state has changed to false, which means no transport is
737
     * in use.
738
     */
739
    @ForOverride
740
    void onNotInUse(InternalSubchannel is) { }
×
741
  }
742

743
  @VisibleForTesting
744
  static final class CallTracingTransport extends ForwardingConnectionClientTransport {
745
    private final ConnectionClientTransport delegate;
746
    private final CallTracer callTracer;
747

748
    private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) {
1✔
749
      this.delegate = delegate;
1✔
750
      this.callTracer = callTracer;
1✔
751
    }
1✔
752

753
    @Override
754
    protected ConnectionClientTransport delegate() {
755
      return delegate;
1✔
756
    }
757

758
    @Override
759
    public ClientStream newStream(
760
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
761
        ClientStreamTracer[] tracers) {
762
      final ClientStream streamDelegate = super.newStream(method, headers, callOptions, tracers);
1✔
763
      return new ForwardingClientStream() {
1✔
764
        @Override
765
        protected ClientStream delegate() {
766
          return streamDelegate;
1✔
767
        }
768

769
        @Override
770
        public void start(final ClientStreamListener listener) {
771
          callTracer.reportCallStarted();
1✔
772
          super.start(new ForwardingClientStreamListener() {
1✔
773
            @Override
774
            protected ClientStreamListener delegate() {
775
              return listener;
1✔
776
            }
777

778
            @Override
779
            public void closed(
780
                Status status, RpcProgress rpcProgress, Metadata trailers) {
781
              callTracer.reportCallEnded(status.isOk());
1✔
782
              super.closed(status, rpcProgress, trailers);
1✔
783
            }
1✔
784
          });
785
        }
1✔
786
      };
787
    }
788
  }
789

790
  /** Index as in 'i', the pointer to an entry. Not a "search index." */
791
  @VisibleForTesting
792
  static final class Index {
793
    private List<EquivalentAddressGroup> addressGroups;
794
    private int groupIndex;
795
    private int addressIndex;
796

797
    public Index(List<EquivalentAddressGroup> groups) {
1✔
798
      this.addressGroups = groups;
1✔
799
    }
1✔
800

801
    public boolean isValid() {
802
      // addressIndex will never be invalid
803
      return groupIndex < addressGroups.size();
1✔
804
    }
805

806
    public boolean isAtBeginning() {
807
      return groupIndex == 0 && addressIndex == 0;
1✔
808
    }
809

810
    public void increment() {
811
      EquivalentAddressGroup group = addressGroups.get(groupIndex);
1✔
812
      addressIndex++;
1✔
813
      if (addressIndex >= group.getAddresses().size()) {
1✔
814
        groupIndex++;
1✔
815
        addressIndex = 0;
1✔
816
      }
817
    }
1✔
818

819
    public void reset() {
820
      groupIndex = 0;
1✔
821
      addressIndex = 0;
1✔
822
    }
1✔
823

824
    public SocketAddress getCurrentAddress() {
825
      return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
1✔
826
    }
827

828
    public Attributes getCurrentEagAttributes() {
829
      return addressGroups.get(groupIndex).getAttributes();
1✔
830
    }
831

832
    public List<EquivalentAddressGroup> getGroups() {
833
      return addressGroups;
1✔
834
    }
835

836
    /** Update to new groups, resetting the current index. */
837
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
838
      addressGroups = newGroups;
1✔
839
      reset();
1✔
840
    }
1✔
841

842
    /** Returns false if the needle was not found and the current index was left unchanged. */
843
    public boolean seekTo(SocketAddress needle) {
844
      for (int i = 0; i < addressGroups.size(); i++) {
1✔
845
        EquivalentAddressGroup group = addressGroups.get(i);
1✔
846
        int j = group.getAddresses().indexOf(needle);
1✔
847
        if (j == -1) {
1✔
848
          continue;
1✔
849
        }
850
        this.groupIndex = i;
1✔
851
        this.addressIndex = j;
1✔
852
        return true;
1✔
853
      }
854
      return false;
1✔
855
    }
856
  }
857

858
  private String printShortStatus(Status status) {
859
    StringBuilder buffer = new StringBuilder();
1✔
860
    buffer.append(status.getCode());
1✔
861
    if (status.getDescription() != null) {
1✔
862
      buffer.append("(").append(status.getDescription()).append(")");
1✔
863
    }
864
    if (status.getCause() != null) {
1✔
865
      buffer.append("[").append(status.getCause()).append("]");
1✔
866
    }
867
    return buffer.toString();
1✔
868
  }
869

870
  @VisibleForTesting
871
  static final class TransportLogger extends ChannelLogger {
1✔
872
    // Changed just after construction to break a cyclic dependency.
873
    InternalLogId logId;
874

875
    @Override
876
    public void log(ChannelLogLevel level, String message) {
877
      ChannelLoggerImpl.logOnly(logId, level, message);
1✔
878
    }
1✔
879

880
    @Override
881
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
882
      ChannelLoggerImpl.logOnly(logId, level, messageFormat, args);
1✔
883
    }
1✔
884
  }
885
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc