• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20230

31 Mar 2026 09:55AM UTC coverage: 88.734% (+0.01%) from 88.72%
#20230

push

github

web-flow
openTelemetry: add tcp metrics (#12652)

Implements [A80](https://github.com/grpc/proposal/pull/519)

35697 of 40229 relevant lines covered (88.73%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.27
/../core/src/main/java/io/grpc/internal/InternalSubchannel.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static io.grpc.ConnectivityState.CONNECTING;
20
import static io.grpc.ConnectivityState.IDLE;
21
import static io.grpc.ConnectivityState.READY;
22
import static io.grpc.ConnectivityState.SHUTDOWN;
23
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.MoreObjects;
27
import com.google.common.base.Preconditions;
28
import com.google.common.base.Stopwatch;
29
import com.google.common.base.Supplier;
30
import com.google.common.util.concurrent.ListenableFuture;
31
import com.google.common.util.concurrent.SettableFuture;
32
import com.google.errorprone.annotations.ForOverride;
33
import io.grpc.Attributes;
34
import io.grpc.CallOptions;
35
import io.grpc.ChannelLogger;
36
import io.grpc.ChannelLogger.ChannelLogLevel;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.ClientTransportFilter;
39
import io.grpc.ConnectivityState;
40
import io.grpc.ConnectivityStateInfo;
41
import io.grpc.EquivalentAddressGroup;
42
import io.grpc.HttpConnectProxiedSocketAddress;
43
import io.grpc.InternalChannelz;
44
import io.grpc.InternalChannelz.ChannelStats;
45
import io.grpc.InternalInstrumented;
46
import io.grpc.InternalLogId;
47
import io.grpc.InternalWithLogId;
48
import io.grpc.LoadBalancer;
49
import io.grpc.Metadata;
50
import io.grpc.MethodDescriptor;
51
import io.grpc.MetricRecorder;
52
import io.grpc.NameResolver;
53
import io.grpc.SecurityLevel;
54
import io.grpc.Status;
55
import io.grpc.SynchronizationContext;
56
import io.grpc.SynchronizationContext.ScheduledHandle;
57
import java.net.SocketAddress;
58
import java.util.ArrayList;
59
import java.util.Collection;
60
import java.util.Collections;
61
import java.util.List;
62
import java.util.concurrent.ScheduledExecutorService;
63
import java.util.concurrent.TimeUnit;
64
import javax.annotation.Nullable;
65
import javax.annotation.concurrent.ThreadSafe;
66

67
/**
68
 * Transports for a single {@link SocketAddress}.
69
 */
70
@ThreadSafe
71
final class InternalSubchannel implements InternalInstrumented<ChannelStats>, TransportProvider {
72

73
  private final InternalLogId logId;
74
  private final String authority;
75
  private final String userAgent;
76
  private final BackoffPolicy.Provider backoffPolicyProvider;
77
  private final Callback callback;
78
  private final ClientTransportFactory transportFactory;
79
  private final ScheduledExecutorService scheduledExecutor;
80
  private final InternalChannelz channelz;
81
  private final CallTracer callsTracer;
82
  private final ChannelTracer channelTracer;
83
  private final MetricRecorder metricRecorder;
84
  private final ChannelLogger channelLogger;
85
  private final boolean reconnectDisabled;
86

87
  private final List<ClientTransportFilter> transportFilters;
88

89
  /**
90
   * All field must be mutated in the syncContext.
91
   */
92
  private final SynchronizationContext syncContext;
93

94
  /**
95
   * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if
96
   * both are null.
97
   *
98
   * <p>Note: any {@link Index#updateAddresses(List)} should also update {@link #addressGroups}.
99
   */
100
  private final Index addressIndex;
101

102
  /**
103
   * A volatile accessor to {@link Index#getAddressGroups()}. There are few methods ({@link
104
   * #getAddressGroups()} and {@link #toString()} access this value where they supposed to access
105
   * in the {@link #syncContext}. Ideally {@link Index#getAddressGroups()} can be volatile, so we
106
   * don't need to maintain this volatile accessor. Although, having this accessor can reduce
107
   * unnecessary volatile reads while it delivers clearer intention of why .
108
   */
109
  private volatile List<EquivalentAddressGroup> addressGroups;
110

111
  /**
112
   * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is
113
   * scheduled.
114
   */
115
  private BackoffPolicy reconnectPolicy;
116

117
  /**
118
   * Timer monitoring duration since entering CONNECTING state.
119
   */
120
  private final Stopwatch connectingTimer;
121

122
  @Nullable
123
  private ScheduledHandle reconnectTask;
124
  @Nullable
125
  private ScheduledHandle shutdownDueToUpdateTask;
126
  @Nullable
127
  private ManagedClientTransport shutdownDueToUpdateTransport;
128

129
  /**
130
   * All transports that are not terminated. At the very least the value of {@link #activeTransport}
131
   * will be present, but previously used transports that still have streams or are stopping may
132
   * also be present.
133
   */
134
  private final Collection<ConnectionClientTransport> transports = new ArrayList<>();
1✔
135

136
  // Must only be used from syncContext
137
  private final InUseStateAggregator<ConnectionClientTransport> inUseStateAggregator =
1✔
138
      new InUseStateAggregator<ConnectionClientTransport>() {
1✔
139
        @Override
140
        protected void handleInUse() {
141
          callback.onInUse(InternalSubchannel.this);
1✔
142
        }
1✔
143

144
        @Override
145
        protected void handleNotInUse() {
146
          callback.onNotInUse(InternalSubchannel.this);
1✔
147
        }
1✔
148
      };
149

150
  /**
151
   * The to-be active transport, which is not ready yet.
152
   */
153
  @Nullable
154
  private ConnectionClientTransport pendingTransport;
155

156
  /**
157
   * The transport for new outgoing requests. Non-null only in READY state.
158
   */
159
  @Nullable
160
  private volatile ManagedClientTransport activeTransport;
161

162
  private volatile ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
1✔
163

164
  private Status shutdownReason;
165

166
  private volatile Attributes connectedAddressAttributes;
167
  private final SubchannelMetrics subchannelMetrics;
168
  private final String target;
169

170
  InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
171
                     BackoffPolicy.Provider backoffPolicyProvider,
172
                     ClientTransportFactory transportFactory,
173
                     ScheduledExecutorService scheduledExecutor,
174
                     Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext,
175
                     Callback callback, InternalChannelz channelz, CallTracer callsTracer,
176
                     ChannelTracer channelTracer, InternalLogId logId,
177
                     ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters,
178
                     String target,
179
                     MetricRecorder metricRecorder) {
1✔
180
    List<EquivalentAddressGroup> addressGroups = args.getAddresses();
1✔
181
    Preconditions.checkNotNull(addressGroups, "addressGroups");
1✔
182
    Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
1✔
183
    checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
1✔
184
    List<EquivalentAddressGroup> unmodifiableAddressGroups =
1✔
185
        Collections.unmodifiableList(new ArrayList<>(addressGroups));
1✔
186
    this.addressGroups = unmodifiableAddressGroups;
1✔
187
    this.addressIndex = new Index(unmodifiableAddressGroups);
1✔
188
    this.authority = authority;
1✔
189
    this.userAgent = userAgent;
1✔
190
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
191
    this.transportFactory = transportFactory;
1✔
192
    this.scheduledExecutor = scheduledExecutor;
1✔
193
    this.connectingTimer = stopwatchSupplier.get();
1✔
194
    this.syncContext = syncContext;
1✔
195
    this.metricRecorder = metricRecorder;
1✔
196
    this.callback = callback;
1✔
197
    this.channelz = channelz;
1✔
198
    this.callsTracer = callsTracer;
1✔
199
    this.channelTracer = Preconditions.checkNotNull(channelTracer, "channelTracer");
1✔
200
    this.logId = Preconditions.checkNotNull(logId, "logId");
1✔
201
    this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
1✔
202
    this.transportFilters = transportFilters;
1✔
203
    this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
1✔
204
    this.target = target;
1✔
205
    this.subchannelMetrics = new SubchannelMetrics(metricRecorder);
1✔
206
  }
1✔
207

208
  ChannelLogger getChannelLogger() {
209
    return channelLogger;
1✔
210
  }
211

212
  @Override
213
  public ClientTransport obtainActiveTransport() {
214
    ClientTransport savedTransport = activeTransport;
1✔
215
    if (savedTransport != null) {
1✔
216
      return savedTransport;
1✔
217
    }
218
    syncContext.execute(new Runnable() {
1✔
219
      @Override
220
      public void run() {
221
        if (state.getState() == IDLE) {
1✔
222
          channelLogger.log(ChannelLogLevel.INFO, "CONNECTING as requested");
1✔
223
          gotoNonErrorState(CONNECTING);
1✔
224
          startNewTransport();
1✔
225
        }
226
      }
1✔
227
    });
228
    return null;
1✔
229
  }
230

231
  /**
232
   * Returns a READY transport if there is any, without trying to connect.
233
   */
234
  @Nullable
235
  ClientTransport getTransport() {
236
    return activeTransport;
1✔
237
  }
238

239
  /**
240
   * Returns the authority string associated with this Subchannel.
241
   */
242
  String getAuthority() {
243
    return authority;
×
244
  }
245

246
  private void startNewTransport() {
247
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
248

249
    Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
1✔
250

251
    if (addressIndex.isAtBeginning()) {
1✔
252
      connectingTimer.reset().start();
1✔
253
    }
254
    SocketAddress address = addressIndex.getCurrentAddress();
1✔
255

256
    HttpConnectProxiedSocketAddress proxiedAddr = null;
1✔
257
    if (address instanceof HttpConnectProxiedSocketAddress) {
1✔
258
      proxiedAddr = (HttpConnectProxiedSocketAddress) address;
×
259
      address = proxiedAddr.getTargetAddress();
×
260
    }
261

262
    Attributes currentEagAttributes = addressIndex.getCurrentEagAttributes();
1✔
263
    String eagChannelAuthority = currentEagAttributes
1✔
264
            .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE);
1✔
265
    ClientTransportFactory.ClientTransportOptions options =
1✔
266
        new ClientTransportFactory.ClientTransportOptions()
267
          .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
1✔
268
          .setEagAttributes(currentEagAttributes)
1✔
269
          .setUserAgent(userAgent)
1✔
270
          .setMetricRecorder(metricRecorder)
1✔
271
          .setHttpConnectProxiedSocketAddress(proxiedAddr);
1✔
272
    TransportLogger transportLogger = new TransportLogger();
1✔
273
    // In case the transport logs in the constructor, use the subchannel logId
274
    transportLogger.logId = getLogId();
1✔
275
    ConnectionClientTransport transport =
1✔
276
        new CallTracingTransport(
277
            transportFactory
278
                .newClientTransport(address, options, transportLogger), callsTracer);
1✔
279
    transportLogger.logId = transport.getLogId();
1✔
280
    channelz.addClientSocket(transport);
1✔
281
    pendingTransport = transport;
1✔
282
    transports.add(transport);
1✔
283
    Runnable runnable = transport.start(new TransportListener(transport));
1✔
284
    if (runnable != null) {
1✔
285
      syncContext.executeLater(runnable);
1✔
286
    }
287
    channelLogger.log(ChannelLogLevel.INFO, "Started transport {0}", transportLogger.logId);
1✔
288
  }
1✔
289

290
  /**
291
   * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
292
   * @param status the causal status when the channel begins transition to
293
   *     TRANSIENT_FAILURE.
294
   */
295
  private void scheduleBackoff(final Status status) {
296
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
297

298
    class EndOfCurrentBackoff implements Runnable {
1✔
299
      @Override
300
      public void run() {
301
        reconnectTask = null;
1✔
302
        channelLogger.log(ChannelLogLevel.INFO, "CONNECTING after backoff");
1✔
303
        gotoNonErrorState(CONNECTING);
1✔
304
        startNewTransport();
1✔
305
      }
1✔
306
    }
307

308
    gotoState(ConnectivityStateInfo.forTransientFailure(status));
1✔
309

310
    if (reconnectDisabled) {
1✔
311
      return;
1✔
312
    }
313

314
    if (reconnectPolicy == null) {
1✔
315
      reconnectPolicy = backoffPolicyProvider.get();
1✔
316
    }
317
    long delayNanos =
1✔
318
        reconnectPolicy.nextBackoffNanos() - connectingTimer.elapsed(TimeUnit.NANOSECONDS);
1✔
319
    channelLogger.log(
1✔
320
        ChannelLogLevel.INFO,
321
        "TRANSIENT_FAILURE ({0}). Will reconnect after {1} ns",
322
        printShortStatus(status), delayNanos);
1✔
323
    Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
1✔
324
    reconnectTask = syncContext.schedule(
1✔
325
        new EndOfCurrentBackoff(),
326
        delayNanos,
327
        TimeUnit.NANOSECONDS,
328
        scheduledExecutor);
329
  }
1✔
330

331
  /**
332
   * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise, this
333
   * method has no effect.
334
   */
335
  void resetConnectBackoff() {
336
    syncContext.execute(new Runnable() {
1✔
337
      @Override
338
      public void run() {
339
        if (state.getState() != TRANSIENT_FAILURE) {
1✔
340
          return;
1✔
341
        }
342
        cancelReconnectTask();
1✔
343
        channelLogger.log(ChannelLogLevel.INFO, "CONNECTING; backoff interrupted");
1✔
344
        gotoNonErrorState(CONNECTING);
1✔
345
        startNewTransport();
1✔
346
      }
1✔
347
    });
348
  }
1✔
349

350
  private void gotoNonErrorState(final ConnectivityState newState) {
351
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
352

353
    gotoState(ConnectivityStateInfo.forNonError(newState));
1✔
354
  }
1✔
355

356
  private void gotoState(final ConnectivityStateInfo newState) {
357
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
358

359
    if (state.getState() != newState.getState()) {
1✔
360
      Preconditions.checkState(state.getState() != SHUTDOWN,
1✔
361
          "Cannot transition out of SHUTDOWN to %s", newState.getState());
1✔
362
      if (reconnectDisabled && newState.getState() == TRANSIENT_FAILURE) {
1✔
363
        state = ConnectivityStateInfo.forNonError(IDLE);
1✔
364
      } else {
365
        state = newState;
1✔
366
      }
367
      callback.onStateChange(InternalSubchannel.this, newState);
1✔
368
    }
369
  }
1✔
370

371
  /** Replaces the existing addresses, avoiding unnecessary reconnects. */
372
  public void updateAddresses(final List<EquivalentAddressGroup> newAddressGroups) {
373
    Preconditions.checkNotNull(newAddressGroups, "newAddressGroups");
1✔
374
    checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry");
1✔
375
    Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty");
1✔
376
    final List<EquivalentAddressGroup> newImmutableAddressGroups =
1✔
377
        Collections.unmodifiableList(new ArrayList<>(newAddressGroups));
1✔
378

379
    syncContext.execute(new Runnable() {
1✔
380
      @Override
381
      public void run() {
382
        ManagedClientTransport savedTransport = null;
1✔
383
        SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
384
        addressIndex.updateGroups(newImmutableAddressGroups);
1✔
385
        addressGroups = newImmutableAddressGroups;
1✔
386
        if (state.getState() == READY || state.getState() == CONNECTING) {
1✔
387
          if (!addressIndex.seekTo(previousAddress)) {
1✔
388
            // Forced to drop the connection
389
            if (state.getState() == READY) {
1✔
390
              savedTransport = activeTransport;
1✔
391
              activeTransport = null;
1✔
392
              addressIndex.reset();
1✔
393
              gotoNonErrorState(IDLE);
1✔
394
            } else {
395
              pendingTransport.shutdown(
1✔
396
                  Status.UNAVAILABLE.withDescription(
1✔
397
                    "InternalSubchannel closed pending transport due to address change"));
398
              pendingTransport = null;
1✔
399
              addressIndex.reset();
1✔
400
              startNewTransport();
1✔
401
            }
402
          }
403
        }
404
        if (savedTransport != null) {
1✔
405
          if (shutdownDueToUpdateTask != null) {
1✔
406
            // Keeping track of multiple shutdown tasks adds complexity, and shouldn't generally be
407
            // necessary. This transport has probably already had plenty of time.
408
            shutdownDueToUpdateTransport.shutdown(
1✔
409
                Status.UNAVAILABLE.withDescription(
1✔
410
                    "InternalSubchannel closed transport early due to address change"));
411
            shutdownDueToUpdateTask.cancel();
1✔
412
            shutdownDueToUpdateTask = null;
1✔
413
            shutdownDueToUpdateTransport = null;
1✔
414
          }
415
          // Avoid needless RPC failures by delaying the shutdown. See
416
          // https://github.com/grpc/grpc-java/issues/2562
417
          shutdownDueToUpdateTransport = savedTransport;
1✔
418
          shutdownDueToUpdateTask = syncContext.schedule(
1✔
419
              new Runnable() {
1✔
420
                @Override public void run() {
421
                  ManagedClientTransport transport = shutdownDueToUpdateTransport;
1✔
422
                  shutdownDueToUpdateTask = null;
1✔
423
                  shutdownDueToUpdateTransport = null;
1✔
424
                  transport.shutdown(
1✔
425
                      Status.UNAVAILABLE.withDescription(
1✔
426
                          "InternalSubchannel closed transport due to address change"));
427
                }
1✔
428
              },
429
              ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS,
430
              TimeUnit.SECONDS,
431
              scheduledExecutor);
1✔
432
        }
433
      }
1✔
434
    });
435
  }
1✔
436

437
  public void shutdown(final Status reason) {
438
    syncContext.execute(new Runnable() {
1✔
439
      @Override
440
      public void run() {
441
        ManagedClientTransport savedActiveTransport;
442
        ConnectionClientTransport savedPendingTransport;
443
        if (state.getState() == SHUTDOWN) {
1✔
444
          return;
1✔
445
        }
446
        shutdownReason = reason;
1✔
447
        savedActiveTransport = activeTransport;
1✔
448
        savedPendingTransport = pendingTransport;
1✔
449
        activeTransport = null;
1✔
450
        pendingTransport = null;
1✔
451
        gotoNonErrorState(SHUTDOWN);
1✔
452
        addressIndex.reset();
1✔
453
        if (transports.isEmpty()) {
1✔
454
          handleTermination();
1✔
455
        }  // else: the callback will be run once all transports have been terminated
456
        cancelReconnectTask();
1✔
457
        if (shutdownDueToUpdateTask != null) {
1✔
458
          shutdownDueToUpdateTask.cancel();
1✔
459
          shutdownDueToUpdateTransport.shutdown(reason);
1✔
460
          shutdownDueToUpdateTask = null;
1✔
461
          shutdownDueToUpdateTransport = null;
1✔
462
        }
463
        if (savedActiveTransport != null) {
1✔
464
          savedActiveTransport.shutdown(reason);
1✔
465
        }
466
        if (savedPendingTransport != null) {
1✔
467
          savedPendingTransport.shutdown(reason);
1✔
468
        }
469
      }
1✔
470
    });
471
  }
1✔
472

473
  @Override
474
  public String toString() {
475
    // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock
476
    // since there may be many addresses.
477
    return MoreObjects.toStringHelper(this)
×
478
        .add("logId", logId.getId())
×
479
        .add("addressGroups", addressGroups)
×
480
        .toString();
×
481
  }
482

483
  private void handleTermination() {
484
    syncContext.execute(new Runnable() {
1✔
485
      @Override
486
      public void run() {
487
        channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
488
        callback.onTerminated(InternalSubchannel.this);
1✔
489
      }
1✔
490
    });
491
  }
1✔
492

493
  private void handleTransportInUseState(
494
      final ConnectionClientTransport transport, final boolean inUse) {
495
    syncContext.execute(new Runnable() {
1✔
496
      @Override
497
      public void run() {
498
        inUseStateAggregator.updateObjectInUse(transport, inUse);
1✔
499
      }
1✔
500
    });
501
  }
1✔
502

503
  void shutdownNow(final Status reason) {
504
    shutdown(reason);
1✔
505
    syncContext.execute(new Runnable() {
1✔
506
      @Override
507
      public void run() {
508
        Collection<ManagedClientTransport> transportsCopy =
1✔
509
            new ArrayList<ManagedClientTransport>(transports);
1✔
510

511
        for (ManagedClientTransport transport : transportsCopy) {
1✔
512
          transport.shutdownNow(reason);
1✔
513
        }
1✔
514
      }
1✔
515
    });
516
  }
1✔
517

518
  List<EquivalentAddressGroup> getAddressGroups() {
519
    return addressGroups;
1✔
520
  }
521

522
  private void cancelReconnectTask() {
523
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
524

525
    if (reconnectTask != null) {
1✔
526
      reconnectTask.cancel();
1✔
527
      reconnectTask = null;
1✔
528
      reconnectPolicy = null;
1✔
529
    }
530
  }
1✔
531

532
  @Override
533
  public InternalLogId getLogId() {
534
    return logId;
1✔
535
  }
536

537
  @Override
538
  public ListenableFuture<ChannelStats> getStats() {
539
    final SettableFuture<ChannelStats> channelStatsFuture = SettableFuture.create();
1✔
540
    syncContext.execute(new Runnable() {
1✔
541
      @Override
542
      public void run() {
543
        ChannelStats.Builder builder = new ChannelStats.Builder();
1✔
544
        List<EquivalentAddressGroup> addressGroupsSnapshot = addressIndex.getGroups();
1✔
545
        List<InternalWithLogId> transportsSnapshot = new ArrayList<InternalWithLogId>(transports);
1✔
546
        builder.setTarget(addressGroupsSnapshot.toString()).setState(getState());
1✔
547
        builder.setSockets(transportsSnapshot);
1✔
548
        callsTracer.updateBuilder(builder);
1✔
549
        channelTracer.updateBuilder(builder);
1✔
550
        channelStatsFuture.set(builder.build());
1✔
551
      }
1✔
552
    });
553
    return channelStatsFuture;
1✔
554
  }
555

556
  /**
557
   * Return attributes for server address connected by sub channel.
558
   */
559
  public Attributes getConnectedAddressAttributes() {
560
    return connectedAddressAttributes;
1✔
561
  }
562

563
  ConnectivityState getState() {
564
    return state.getState();
1✔
565
  }
566

567
  private static void checkListHasNoNulls(List<?> list, String msg) {
568
    for (Object item : list) {
1✔
569
      Preconditions.checkNotNull(item, msg);
1✔
570
    }
1✔
571
  }
1✔
572

573
  /** Listener for real transports. */
574
  private class TransportListener implements ManagedClientTransport.Listener {
575
    final ConnectionClientTransport transport;
576
    boolean shutdownInitiated = false;
1✔
577

578
    TransportListener(ConnectionClientTransport transport) {
1✔
579
      this.transport = transport;
1✔
580
    }
1✔
581

582
    @Override
583
    public Attributes filterTransport(Attributes attributes) {
584
      for (ClientTransportFilter filter : transportFilters) {
1✔
585
        attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
1✔
586
            "Filter %s returned null", filter);
587
      }
1✔
588
      return attributes;
1✔
589
    }
590

591
    @Override
592
    public void transportReady() {
593
      channelLogger.log(ChannelLogLevel.INFO, "READY");
1✔
594
      syncContext.execute(new Runnable() {
1✔
595
        @Override
596
        public void run() {
597
          reconnectPolicy = null;
1✔
598
          if (shutdownReason != null) {
1✔
599
            // activeTransport should have already been set to null by shutdown(). We keep it null.
600
            Preconditions.checkState(activeTransport == null,
1✔
601
                "Unexpected non-null activeTransport");
602
            transport.shutdown(shutdownReason);
1✔
603
          } else if (pendingTransport == transport) {
1✔
604
            activeTransport = transport;
1✔
605
            pendingTransport = null;
1✔
606
            connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
1✔
607
            gotoNonErrorState(READY);
1✔
608
            subchannelMetrics.recordConnectionAttemptSucceeded(/* target= */ target,
1✔
609
                /* backendService= */ getAttributeOrDefault(
1✔
610
                    addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
1✔
611
                /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
612
                    EquivalentAddressGroup.ATTR_LOCALITY_NAME),
613
                /* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
1✔
614
                    .get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
1✔
615
          }
616
        }
1✔
617
      });
618
    }
1✔
619

620
    @Override
621
    public void transportInUse(boolean inUse) {
622
      handleTransportInUseState(transport, inUse);
1✔
623
    }
1✔
624

625
    @Override
626
    public void transportShutdown(final Status s, final DisconnectError disconnectError) {
627
      channelLogger.log(
1✔
628
          ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
1✔
629
      shutdownInitiated = true;
1✔
630
      syncContext.execute(new Runnable() {
1✔
631
        @Override
632
        public void run() {
633
          if (state.getState() == SHUTDOWN) {
1✔
634
            return;
1✔
635
          }
636
          if (activeTransport == transport) {
1✔
637
            activeTransport = null;
1✔
638
            addressIndex.reset();
1✔
639
            gotoNonErrorState(IDLE);
1✔
640
            subchannelMetrics.recordDisconnection(/* target= */ target,
1✔
641
                /* backendService= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
642
                    NameResolver.ATTR_BACKEND_SERVICE),
643
                /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
644
                    EquivalentAddressGroup.ATTR_LOCALITY_NAME),
645
                /* disconnectError= */ disconnectError.toErrorString(),
1✔
646
                /* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
1✔
647
                    .get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
1✔
648
          } else if (pendingTransport == transport) {
1✔
649
            subchannelMetrics.recordConnectionAttemptFailed(/* target= */ target,
1✔
650
                /* backendService= */getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
651
                    NameResolver.ATTR_BACKEND_SERVICE),
652
                /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
653
                    EquivalentAddressGroup.ATTR_LOCALITY_NAME));
654
            Preconditions.checkState(state.getState() == CONNECTING,
1✔
655
                "Expected state is CONNECTING, actual state is %s", state.getState());
1✔
656
            addressIndex.increment();
1✔
657
            // Continue to reconnect if there are still addresses to try.
658
            if (!addressIndex.isValid()) {
1✔
659
              pendingTransport = null;
1✔
660
              addressIndex.reset();
1✔
661
              // Initiate backoff
662
              // Transition to TRANSIENT_FAILURE
663
              scheduleBackoff(s);
1✔
664
            } else {
665
              startNewTransport();
1✔
666
            }
667
          }
668
        }
1✔
669
      });
670
    }
1✔
671

672
    @Override
673
    public void transportTerminated() {
674
      Preconditions.checkState(
1✔
675
          shutdownInitiated, "transportShutdown() must be called before transportTerminated().");
676

677
      channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId());
1✔
678
      channelz.removeClientSocket(transport);
1✔
679
      handleTransportInUseState(transport, false);
1✔
680
      for (ClientTransportFilter filter : transportFilters) {
1✔
681
        filter.transportTerminated(transport.getAttributes());
1✔
682
      }
1✔
683
      syncContext.execute(new Runnable() {
1✔
684
        @Override
685
        public void run() {
686
          transports.remove(transport);
1✔
687
          if (state.getState() == SHUTDOWN && transports.isEmpty()) {
1✔
688
            handleTermination();
1✔
689
          }
690
        }
1✔
691
      });
692
    }
1✔
693

694
    private String extractSecurityLevel(SecurityLevel securityLevel) {
695
      if (securityLevel == null) {
1✔
696
        return "none";
1✔
697
      }
698
      switch (securityLevel) {
1✔
699
        case NONE:
700
          return "none";
×
701
        case INTEGRITY:
702
          return "integrity_only";
×
703
        case PRIVACY_AND_INTEGRITY:
704
          return "privacy_and_integrity";
1✔
705
        default:
706
          throw new IllegalArgumentException("Unknown SecurityLevel: " + securityLevel);
×
707
      }
708
    }
709

710
    private String getAttributeOrDefault(Attributes attributes, Attributes.Key<String> key) {
711
      String value = attributes.get(key);
1✔
712
      return value == null ? "" : value;
1✔
713
    }
714
  }
715

716
  // All methods are called in syncContext
717
  abstract static class Callback {
1✔
718
    /**
719
     * Called when the subchannel is terminated, which means it's shut down and all transports
720
     * have been terminated.
721
     */
722
    @ForOverride
723
    void onTerminated(InternalSubchannel is) { }
×
724

725
    /**
726
     * Called when the subchannel's connectivity state has changed.
727
     */
728
    @ForOverride
729
    void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { }
×
730

731
    /**
732
     * Called when the subchannel's in-use state has changed to true, which means at least one
733
     * transport is in use.
734
     */
735
    @ForOverride
736
    void onInUse(InternalSubchannel is) { }
×
737

738
    /**
739
     * Called when the subchannel's in-use state has changed to false, which means no transport is
740
     * in use.
741
     */
742
    @ForOverride
743
    void onNotInUse(InternalSubchannel is) { }
×
744
  }
745

746
  @VisibleForTesting
747
  static final class CallTracingTransport extends ForwardingConnectionClientTransport {
748
    private final ConnectionClientTransport delegate;
749
    private final CallTracer callTracer;
750

751
    private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) {
1✔
752
      this.delegate = delegate;
1✔
753
      this.callTracer = callTracer;
1✔
754
    }
1✔
755

756
    @Override
757
    protected ConnectionClientTransport delegate() {
758
      return delegate;
1✔
759
    }
760

761
    @Override
762
    public ClientStream newStream(
763
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
764
        ClientStreamTracer[] tracers) {
765
      final ClientStream streamDelegate = super.newStream(method, headers, callOptions, tracers);
1✔
766
      return new ForwardingClientStream() {
1✔
767
        @Override
768
        protected ClientStream delegate() {
769
          return streamDelegate;
1✔
770
        }
771

772
        @Override
773
        public void start(final ClientStreamListener listener) {
774
          callTracer.reportCallStarted();
1✔
775
          super.start(new ForwardingClientStreamListener() {
1✔
776
            @Override
777
            protected ClientStreamListener delegate() {
778
              return listener;
1✔
779
            }
780

781
            @Override
782
            public void closed(
783
                Status status, RpcProgress rpcProgress, Metadata trailers) {
784
              callTracer.reportCallEnded(status.isOk());
1✔
785
              super.closed(status, rpcProgress, trailers);
1✔
786
            }
1✔
787
          });
788
        }
1✔
789
      };
790
    }
791
  }
792

793
  /** Index as in 'i', the pointer to an entry. Not a "search index." */
794
  @VisibleForTesting
795
  static final class Index {
796
    private List<EquivalentAddressGroup> addressGroups;
797
    private int groupIndex;
798
    private int addressIndex;
799

800
    public Index(List<EquivalentAddressGroup> groups) {
1✔
801
      this.addressGroups = groups;
1✔
802
    }
1✔
803

804
    public boolean isValid() {
805
      // addressIndex will never be invalid
806
      return groupIndex < addressGroups.size();
1✔
807
    }
808

809
    public boolean isAtBeginning() {
810
      return groupIndex == 0 && addressIndex == 0;
1✔
811
    }
812

813
    public void increment() {
814
      EquivalentAddressGroup group = addressGroups.get(groupIndex);
1✔
815
      addressIndex++;
1✔
816
      if (addressIndex >= group.getAddresses().size()) {
1✔
817
        groupIndex++;
1✔
818
        addressIndex = 0;
1✔
819
      }
820
    }
1✔
821

822
    public void reset() {
823
      groupIndex = 0;
1✔
824
      addressIndex = 0;
1✔
825
    }
1✔
826

827
    public SocketAddress getCurrentAddress() {
828
      return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
1✔
829
    }
830

831
    public Attributes getCurrentEagAttributes() {
832
      return addressGroups.get(groupIndex).getAttributes();
1✔
833
    }
834

835
    public List<EquivalentAddressGroup> getGroups() {
836
      return addressGroups;
1✔
837
    }
838

839
    /** Update to new groups, resetting the current index. */
840
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
841
      addressGroups = newGroups;
1✔
842
      reset();
1✔
843
    }
1✔
844

845
    /** Returns false if the needle was not found and the current index was left unchanged. */
846
    public boolean seekTo(SocketAddress needle) {
847
      for (int i = 0; i < addressGroups.size(); i++) {
1✔
848
        EquivalentAddressGroup group = addressGroups.get(i);
1✔
849
        int j = group.getAddresses().indexOf(needle);
1✔
850
        if (j == -1) {
1✔
851
          continue;
1✔
852
        }
853
        this.groupIndex = i;
1✔
854
        this.addressIndex = j;
1✔
855
        return true;
1✔
856
      }
857
      return false;
1✔
858
    }
859
  }
860

861
  private String printShortStatus(Status status) {
862
    StringBuilder buffer = new StringBuilder();
1✔
863
    buffer.append(status.getCode());
1✔
864
    if (status.getDescription() != null) {
1✔
865
      buffer.append("(").append(status.getDescription()).append(")");
1✔
866
    }
867
    if (status.getCause() != null) {
1✔
868
      buffer.append("[").append(status.getCause()).append("]");
1✔
869
    }
870
    return buffer.toString();
1✔
871
  }
872

873
  @VisibleForTesting
874
  static final class TransportLogger extends ChannelLogger {
1✔
875
    // Changed just after construction to break a cyclic dependency.
876
    InternalLogId logId;
877

878
    @Override
879
    public void log(ChannelLogLevel level, String message) {
880
      ChannelLoggerImpl.logOnly(logId, level, message);
1✔
881
    }
1✔
882

883
    @Override
884
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
885
      ChannelLoggerImpl.logOnly(logId, level, messageFormat, args);
1✔
886
    }
1✔
887
  }
888
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc