• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20241

14 Apr 2026 05:21PM UTC coverage: 88.817% (+0.005%) from 88.812%
#20241

push

github

web-flow
core,xds: Fix backend_service plumbing for subchannel metrics (#12735)

This PR fixes #12432.

Subchannel metrics read backend_service from EAG attributes, but xDS
currently only populates the resolution result attribute. As a result,
grpc.lb.backend_service is left unset for subchannel metrics in the cds
path.

This change adds an internal EAG-level backend_service attribute in cds
and has InternalSubchannel read that attribute for subchannel metrics,
while keeping a fallback to the existing resolution result attribute.

This PR is intentionally scoped to subchannel metrics only and does not
attempt the broader #12431 plumbing changes.

36025 of 40561 relevant lines covered (88.82%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.33
/../core/src/main/java/io/grpc/internal/InternalSubchannel.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static io.grpc.ConnectivityState.CONNECTING;
20
import static io.grpc.ConnectivityState.IDLE;
21
import static io.grpc.ConnectivityState.READY;
22
import static io.grpc.ConnectivityState.SHUTDOWN;
23
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.MoreObjects;
27
import com.google.common.base.Preconditions;
28
import com.google.common.base.Stopwatch;
29
import com.google.common.base.Supplier;
30
import com.google.common.util.concurrent.ListenableFuture;
31
import com.google.common.util.concurrent.SettableFuture;
32
import com.google.errorprone.annotations.ForOverride;
33
import io.grpc.Attributes;
34
import io.grpc.CallOptions;
35
import io.grpc.ChannelLogger;
36
import io.grpc.ChannelLogger.ChannelLogLevel;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.ClientTransportFilter;
39
import io.grpc.ConnectivityState;
40
import io.grpc.ConnectivityStateInfo;
41
import io.grpc.EquivalentAddressGroup;
42
import io.grpc.HttpConnectProxiedSocketAddress;
43
import io.grpc.InternalChannelz;
44
import io.grpc.InternalChannelz.ChannelStats;
45
import io.grpc.InternalEquivalentAddressGroup;
46
import io.grpc.InternalInstrumented;
47
import io.grpc.InternalLogId;
48
import io.grpc.InternalWithLogId;
49
import io.grpc.LoadBalancer;
50
import io.grpc.Metadata;
51
import io.grpc.MethodDescriptor;
52
import io.grpc.MetricRecorder;
53
import io.grpc.NameResolver;
54
import io.grpc.SecurityLevel;
55
import io.grpc.Status;
56
import io.grpc.SynchronizationContext;
57
import io.grpc.SynchronizationContext.ScheduledHandle;
58
import java.net.SocketAddress;
59
import java.util.ArrayList;
60
import java.util.Collection;
61
import java.util.Collections;
62
import java.util.List;
63
import java.util.concurrent.ScheduledExecutorService;
64
import java.util.concurrent.TimeUnit;
65
import javax.annotation.Nullable;
66
import javax.annotation.concurrent.ThreadSafe;
67

68
/**
69
 * Transports for a single {@link SocketAddress}.
70
 */
71
@ThreadSafe
72
final class InternalSubchannel implements InternalInstrumented<ChannelStats>, TransportProvider {
73

74
  private final InternalLogId logId;
75
  private final String authority;
76
  private final String userAgent;
77
  private final BackoffPolicy.Provider backoffPolicyProvider;
78
  private final Callback callback;
79
  private final ClientTransportFactory transportFactory;
80
  private final ScheduledExecutorService scheduledExecutor;
81
  private final InternalChannelz channelz;
82
  private final CallTracer callsTracer;
83
  private final ChannelTracer channelTracer;
84
  private final MetricRecorder metricRecorder;
85
  private final ChannelLogger channelLogger;
86
  private final boolean reconnectDisabled;
87

88
  private final List<ClientTransportFilter> transportFilters;
89

90
  /**
91
   * All field must be mutated in the syncContext.
92
   */
93
  private final SynchronizationContext syncContext;
94

95
  /**
96
   * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if
97
   * both are null.
98
   *
99
   * <p>Note: any {@link Index#updateAddresses(List)} should also update {@link #addressGroups}.
100
   */
101
  private final Index addressIndex;
102

103
  /**
104
   * A volatile accessor to {@link Index#getAddressGroups()}. There are few methods ({@link
105
   * #getAddressGroups()} and {@link #toString()} access this value where they supposed to access
106
   * in the {@link #syncContext}. Ideally {@link Index#getAddressGroups()} can be volatile, so we
107
   * don't need to maintain this volatile accessor. Although, having this accessor can reduce
108
   * unnecessary volatile reads while it delivers clearer intention of why .
109
   */
110
  private volatile List<EquivalentAddressGroup> addressGroups;
111

112
  /**
113
   * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is
114
   * scheduled.
115
   */
116
  private BackoffPolicy reconnectPolicy;
117

118
  /**
119
   * Timer monitoring duration since entering CONNECTING state.
120
   */
121
  private final Stopwatch connectingTimer;
122

123
  @Nullable
124
  private ScheduledHandle reconnectTask;
125
  @Nullable
126
  private ScheduledHandle shutdownDueToUpdateTask;
127
  @Nullable
128
  private ManagedClientTransport shutdownDueToUpdateTransport;
129

130
  /**
131
   * All transports that are not terminated. At the very least the value of {@link #activeTransport}
132
   * will be present, but previously used transports that still have streams or are stopping may
133
   * also be present.
134
   */
135
  private final Collection<ConnectionClientTransport> transports = new ArrayList<>();
1✔
136

137
  // Must only be used from syncContext
138
  private final InUseStateAggregator<ConnectionClientTransport> inUseStateAggregator =
1✔
139
      new InUseStateAggregator<ConnectionClientTransport>() {
1✔
140
        @Override
141
        protected void handleInUse() {
142
          callback.onInUse(InternalSubchannel.this);
1✔
143
        }
1✔
144

145
        @Override
146
        protected void handleNotInUse() {
147
          callback.onNotInUse(InternalSubchannel.this);
1✔
148
        }
1✔
149
      };
150

151
  /**
152
   * The to-be active transport, which is not ready yet.
153
   */
154
  @Nullable
155
  private ConnectionClientTransport pendingTransport;
156

157
  /**
158
   * The transport for new outgoing requests. Non-null only in READY state.
159
   */
160
  @Nullable
161
  private volatile ManagedClientTransport activeTransport;
162

163
  private volatile ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
1✔
164

165
  private Status shutdownReason;
166

167
  private volatile Attributes connectedAddressAttributes;
168
  private final SubchannelMetrics subchannelMetrics;
169
  private final String target;
170

171
  InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
172
                     BackoffPolicy.Provider backoffPolicyProvider,
173
                     ClientTransportFactory transportFactory,
174
                     ScheduledExecutorService scheduledExecutor,
175
                     Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext,
176
                     Callback callback, InternalChannelz channelz, CallTracer callsTracer,
177
                     ChannelTracer channelTracer, InternalLogId logId,
178
                     ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters,
179
                     String target,
180
                     MetricRecorder metricRecorder) {
1✔
181
    List<EquivalentAddressGroup> addressGroups = args.getAddresses();
1✔
182
    Preconditions.checkNotNull(addressGroups, "addressGroups");
1✔
183
    Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
1✔
184
    checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
1✔
185
    List<EquivalentAddressGroup> unmodifiableAddressGroups =
1✔
186
        Collections.unmodifiableList(new ArrayList<>(addressGroups));
1✔
187
    this.addressGroups = unmodifiableAddressGroups;
1✔
188
    this.addressIndex = new Index(unmodifiableAddressGroups);
1✔
189
    this.authority = authority;
1✔
190
    this.userAgent = userAgent;
1✔
191
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
192
    this.transportFactory = transportFactory;
1✔
193
    this.scheduledExecutor = scheduledExecutor;
1✔
194
    this.connectingTimer = stopwatchSupplier.get();
1✔
195
    this.syncContext = syncContext;
1✔
196
    this.metricRecorder = metricRecorder;
1✔
197
    this.callback = callback;
1✔
198
    this.channelz = channelz;
1✔
199
    this.callsTracer = callsTracer;
1✔
200
    this.channelTracer = Preconditions.checkNotNull(channelTracer, "channelTracer");
1✔
201
    this.logId = Preconditions.checkNotNull(logId, "logId");
1✔
202
    this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
1✔
203
    this.transportFilters = transportFilters;
1✔
204
    this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
1✔
205
    this.target = target;
1✔
206
    this.subchannelMetrics = new SubchannelMetrics(metricRecorder);
1✔
207
  }
1✔
208

209
  ChannelLogger getChannelLogger() {
210
    return channelLogger;
1✔
211
  }
212

213
  @Override
214
  public ClientTransport obtainActiveTransport() {
215
    ClientTransport savedTransport = activeTransport;
1✔
216
    if (savedTransport != null) {
1✔
217
      return savedTransport;
1✔
218
    }
219
    syncContext.execute(new Runnable() {
1✔
220
      @Override
221
      public void run() {
222
        if (state.getState() == IDLE) {
1✔
223
          channelLogger.log(ChannelLogLevel.INFO, "CONNECTING as requested");
1✔
224
          gotoNonErrorState(CONNECTING);
1✔
225
          startNewTransport();
1✔
226
        }
227
      }
1✔
228
    });
229
    return null;
1✔
230
  }
231

232
  /**
233
   * Returns a READY transport if there is any, without trying to connect.
234
   */
235
  @Nullable
236
  ClientTransport getTransport() {
237
    return activeTransport;
1✔
238
  }
239

240
  /**
241
   * Returns the authority string associated with this Subchannel.
242
   */
243
  String getAuthority() {
244
    return authority;
×
245
  }
246

247
  private void startNewTransport() {
248
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
249

250
    Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
1✔
251

252
    if (addressIndex.isAtBeginning()) {
1✔
253
      connectingTimer.reset().start();
1✔
254
    }
255
    SocketAddress address = addressIndex.getCurrentAddress();
1✔
256

257
    HttpConnectProxiedSocketAddress proxiedAddr = null;
1✔
258
    if (address instanceof HttpConnectProxiedSocketAddress) {
1✔
259
      proxiedAddr = (HttpConnectProxiedSocketAddress) address;
×
260
      address = proxiedAddr.getTargetAddress();
×
261
    }
262

263
    Attributes currentEagAttributes = addressIndex.getCurrentEagAttributes();
1✔
264
    String eagChannelAuthority = currentEagAttributes
1✔
265
            .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE);
1✔
266
    ClientTransportFactory.ClientTransportOptions options =
1✔
267
        new ClientTransportFactory.ClientTransportOptions()
268
          .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
1✔
269
          .setEagAttributes(currentEagAttributes)
1✔
270
          .setUserAgent(userAgent)
1✔
271
          .setMetricRecorder(metricRecorder)
1✔
272
          .setHttpConnectProxiedSocketAddress(proxiedAddr);
1✔
273
    TransportLogger transportLogger = new TransportLogger();
1✔
274
    // In case the transport logs in the constructor, use the subchannel logId
275
    transportLogger.logId = getLogId();
1✔
276
    ConnectionClientTransport transport =
1✔
277
        new CallTracingTransport(
278
            transportFactory
279
                .newClientTransport(address, options, transportLogger), callsTracer);
1✔
280
    transportLogger.logId = transport.getLogId();
1✔
281
    channelz.addClientSocket(transport);
1✔
282
    pendingTransport = transport;
1✔
283
    transports.add(transport);
1✔
284
    Runnable runnable = transport.start(new TransportListener(transport));
1✔
285
    if (runnable != null) {
1✔
286
      syncContext.executeLater(runnable);
1✔
287
    }
288
    channelLogger.log(ChannelLogLevel.INFO, "Started transport {0}", transportLogger.logId);
1✔
289
  }
1✔
290

291
  /**
292
   * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
293
   * @param status the causal status when the channel begins transition to
294
   *     TRANSIENT_FAILURE.
295
   */
296
  private void scheduleBackoff(final Status status) {
297
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
298

299
    class EndOfCurrentBackoff implements Runnable {
1✔
300
      @Override
301
      public void run() {
302
        reconnectTask = null;
1✔
303
        channelLogger.log(ChannelLogLevel.INFO, "CONNECTING after backoff");
1✔
304
        gotoNonErrorState(CONNECTING);
1✔
305
        startNewTransport();
1✔
306
      }
1✔
307
    }
308

309
    gotoState(ConnectivityStateInfo.forTransientFailure(status));
1✔
310

311
    if (reconnectDisabled) {
1✔
312
      return;
1✔
313
    }
314

315
    if (reconnectPolicy == null) {
1✔
316
      reconnectPolicy = backoffPolicyProvider.get();
1✔
317
    }
318
    long delayNanos =
1✔
319
        reconnectPolicy.nextBackoffNanos() - connectingTimer.elapsed(TimeUnit.NANOSECONDS);
1✔
320
    channelLogger.log(
1✔
321
        ChannelLogLevel.INFO,
322
        "TRANSIENT_FAILURE ({0}). Will reconnect after {1} ns",
323
        printShortStatus(status), delayNanos);
1✔
324
    Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
1✔
325
    reconnectTask = syncContext.schedule(
1✔
326
        new EndOfCurrentBackoff(),
327
        delayNanos,
328
        TimeUnit.NANOSECONDS,
329
        scheduledExecutor);
330
  }
1✔
331

332
  /**
333
   * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise, this
334
   * method has no effect.
335
   */
336
  void resetConnectBackoff() {
337
    syncContext.execute(new Runnable() {
1✔
338
      @Override
339
      public void run() {
340
        if (state.getState() != TRANSIENT_FAILURE) {
1✔
341
          return;
1✔
342
        }
343
        cancelReconnectTask();
1✔
344
        channelLogger.log(ChannelLogLevel.INFO, "CONNECTING; backoff interrupted");
1✔
345
        gotoNonErrorState(CONNECTING);
1✔
346
        startNewTransport();
1✔
347
      }
1✔
348
    });
349
  }
1✔
350

351
  private void gotoNonErrorState(final ConnectivityState newState) {
352
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
353

354
    gotoState(ConnectivityStateInfo.forNonError(newState));
1✔
355
  }
1✔
356

357
  private void gotoState(final ConnectivityStateInfo newState) {
358
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
359

360
    if (state.getState() != newState.getState()) {
1✔
361
      Preconditions.checkState(state.getState() != SHUTDOWN,
1✔
362
          "Cannot transition out of SHUTDOWN to %s", newState.getState());
1✔
363
      if (reconnectDisabled && newState.getState() == TRANSIENT_FAILURE) {
1✔
364
        state = ConnectivityStateInfo.forNonError(IDLE);
1✔
365
      } else {
366
        state = newState;
1✔
367
      }
368
      callback.onStateChange(InternalSubchannel.this, newState);
1✔
369
    }
370
  }
1✔
371

372
  /** Replaces the existing addresses, avoiding unnecessary reconnects. */
373
  public void updateAddresses(final List<EquivalentAddressGroup> newAddressGroups) {
374
    Preconditions.checkNotNull(newAddressGroups, "newAddressGroups");
1✔
375
    checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry");
1✔
376
    Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty");
1✔
377
    final List<EquivalentAddressGroup> newImmutableAddressGroups =
1✔
378
        Collections.unmodifiableList(new ArrayList<>(newAddressGroups));
1✔
379

380
    syncContext.execute(new Runnable() {
1✔
381
      @Override
382
      public void run() {
383
        ManagedClientTransport savedTransport = null;
1✔
384
        SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
385
        addressIndex.updateGroups(newImmutableAddressGroups);
1✔
386
        addressGroups = newImmutableAddressGroups;
1✔
387
        if (state.getState() == READY || state.getState() == CONNECTING) {
1✔
388
          if (!addressIndex.seekTo(previousAddress)) {
1✔
389
            // Forced to drop the connection
390
            if (state.getState() == READY) {
1✔
391
              savedTransport = activeTransport;
1✔
392
              activeTransport = null;
1✔
393
              addressIndex.reset();
1✔
394
              gotoNonErrorState(IDLE);
1✔
395
            } else {
396
              pendingTransport.shutdown(
1✔
397
                  Status.UNAVAILABLE.withDescription(
1✔
398
                    "InternalSubchannel closed pending transport due to address change"));
399
              pendingTransport = null;
1✔
400
              addressIndex.reset();
1✔
401
              startNewTransport();
1✔
402
            }
403
          }
404
        }
405
        if (savedTransport != null) {
1✔
406
          if (shutdownDueToUpdateTask != null) {
1✔
407
            // Keeping track of multiple shutdown tasks adds complexity, and shouldn't generally be
408
            // necessary. This transport has probably already had plenty of time.
409
            shutdownDueToUpdateTransport.shutdown(
1✔
410
                Status.UNAVAILABLE.withDescription(
1✔
411
                    "InternalSubchannel closed transport early due to address change"));
412
            shutdownDueToUpdateTask.cancel();
1✔
413
            shutdownDueToUpdateTask = null;
1✔
414
            shutdownDueToUpdateTransport = null;
1✔
415
          }
416
          // Avoid needless RPC failures by delaying the shutdown. See
417
          // https://github.com/grpc/grpc-java/issues/2562
418
          shutdownDueToUpdateTransport = savedTransport;
1✔
419
          shutdownDueToUpdateTask = syncContext.schedule(
1✔
420
              new Runnable() {
1✔
421
                @Override public void run() {
422
                  ManagedClientTransport transport = shutdownDueToUpdateTransport;
1✔
423
                  shutdownDueToUpdateTask = null;
1✔
424
                  shutdownDueToUpdateTransport = null;
1✔
425
                  transport.shutdown(
1✔
426
                      Status.UNAVAILABLE.withDescription(
1✔
427
                          "InternalSubchannel closed transport due to address change"));
428
                }
1✔
429
              },
430
              ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS,
431
              TimeUnit.SECONDS,
432
              scheduledExecutor);
1✔
433
        }
434
      }
1✔
435
    });
436
  }
1✔
437

438
  public void shutdown(final Status reason) {
439
    syncContext.execute(new Runnable() {
1✔
440
      @Override
441
      public void run() {
442
        ManagedClientTransport savedActiveTransport;
443
        ConnectionClientTransport savedPendingTransport;
444
        if (state.getState() == SHUTDOWN) {
1✔
445
          return;
1✔
446
        }
447
        shutdownReason = reason;
1✔
448
        savedActiveTransport = activeTransport;
1✔
449
        savedPendingTransport = pendingTransport;
1✔
450
        activeTransport = null;
1✔
451
        pendingTransport = null;
1✔
452
        gotoNonErrorState(SHUTDOWN);
1✔
453
        addressIndex.reset();
1✔
454
        if (transports.isEmpty()) {
1✔
455
          handleTermination();
1✔
456
        }  // else: the callback will be run once all transports have been terminated
457
        cancelReconnectTask();
1✔
458
        if (shutdownDueToUpdateTask != null) {
1✔
459
          shutdownDueToUpdateTask.cancel();
1✔
460
          shutdownDueToUpdateTransport.shutdown(reason);
1✔
461
          shutdownDueToUpdateTask = null;
1✔
462
          shutdownDueToUpdateTransport = null;
1✔
463
        }
464
        if (savedActiveTransport != null) {
1✔
465
          savedActiveTransport.shutdown(reason);
1✔
466
        }
467
        if (savedPendingTransport != null) {
1✔
468
          savedPendingTransport.shutdown(reason);
1✔
469
        }
470
      }
1✔
471
    });
472
  }
1✔
473

474
  @Override
475
  public String toString() {
476
    // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock
477
    // since there may be many addresses.
478
    return MoreObjects.toStringHelper(this)
×
479
        .add("logId", logId.getId())
×
480
        .add("addressGroups", addressGroups)
×
481
        .toString();
×
482
  }
483

484
  private void handleTermination() {
485
    syncContext.execute(new Runnable() {
1✔
486
      @Override
487
      public void run() {
488
        channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
489
        callback.onTerminated(InternalSubchannel.this);
1✔
490
      }
1✔
491
    });
492
  }
1✔
493

494
  private void handleTransportInUseState(
495
      final ConnectionClientTransport transport, final boolean inUse) {
496
    syncContext.execute(new Runnable() {
1✔
497
      @Override
498
      public void run() {
499
        inUseStateAggregator.updateObjectInUse(transport, inUse);
1✔
500
      }
1✔
501
    });
502
  }
1✔
503

504
  void shutdownNow(final Status reason) {
505
    shutdown(reason);
1✔
506
    syncContext.execute(new Runnable() {
1✔
507
      @Override
508
      public void run() {
509
        Collection<ManagedClientTransport> transportsCopy =
1✔
510
            new ArrayList<ManagedClientTransport>(transports);
1✔
511

512
        for (ManagedClientTransport transport : transportsCopy) {
1✔
513
          transport.shutdownNow(reason);
1✔
514
        }
1✔
515
      }
1✔
516
    });
517
  }
1✔
518

519
  List<EquivalentAddressGroup> getAddressGroups() {
520
    return addressGroups;
1✔
521
  }
522

523
  private void cancelReconnectTask() {
524
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
525

526
    if (reconnectTask != null) {
1✔
527
      reconnectTask.cancel();
1✔
528
      reconnectTask = null;
1✔
529
      reconnectPolicy = null;
1✔
530
    }
531
  }
1✔
532

533
  @Override
534
  public InternalLogId getLogId() {
535
    return logId;
1✔
536
  }
537

538
  @Override
539
  public ListenableFuture<ChannelStats> getStats() {
540
    final SettableFuture<ChannelStats> channelStatsFuture = SettableFuture.create();
1✔
541
    syncContext.execute(new Runnable() {
1✔
542
      @Override
543
      public void run() {
544
        ChannelStats.Builder builder = new ChannelStats.Builder();
1✔
545
        List<EquivalentAddressGroup> addressGroupsSnapshot = addressIndex.getGroups();
1✔
546
        List<InternalWithLogId> transportsSnapshot = new ArrayList<InternalWithLogId>(transports);
1✔
547
        builder.setTarget(addressGroupsSnapshot.toString()).setState(getState());
1✔
548
        builder.setSockets(transportsSnapshot);
1✔
549
        callsTracer.updateBuilder(builder);
1✔
550
        channelTracer.updateBuilder(builder);
1✔
551
        channelStatsFuture.set(builder.build());
1✔
552
      }
1✔
553
    });
554
    return channelStatsFuture;
1✔
555
  }
556

557
  /**
558
   * Return attributes for server address connected by sub channel.
559
   */
560
  public Attributes getConnectedAddressAttributes() {
561
    return connectedAddressAttributes;
1✔
562
  }
563

564
  ConnectivityState getState() {
565
    return state.getState();
1✔
566
  }
567

568
  private static void checkListHasNoNulls(List<?> list, String msg) {
569
    for (Object item : list) {
1✔
570
      Preconditions.checkNotNull(item, msg);
1✔
571
    }
1✔
572
  }
1✔
573

574
  /** Listener for real transports. */
575
  private class TransportListener implements ManagedClientTransport.Listener {
576
    final ConnectionClientTransport transport;
577
    boolean shutdownInitiated = false;
1✔
578

579
    TransportListener(ConnectionClientTransport transport) {
1✔
580
      this.transport = transport;
1✔
581
    }
1✔
582

583
    @Override
584
    public Attributes filterTransport(Attributes attributes) {
585
      for (ClientTransportFilter filter : transportFilters) {
1✔
586
        attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
1✔
587
            "Filter %s returned null", filter);
588
      }
1✔
589
      return attributes;
1✔
590
    }
591

592
    @Override
593
    public void transportReady() {
594
      channelLogger.log(ChannelLogLevel.INFO, "READY");
1✔
595
      syncContext.execute(new Runnable() {
1✔
596
        @Override
597
        public void run() {
598
          reconnectPolicy = null;
1✔
599
          if (shutdownReason != null) {
1✔
600
            // activeTransport should have already been set to null by shutdown(). We keep it null.
601
            Preconditions.checkState(activeTransport == null,
1✔
602
                "Unexpected non-null activeTransport");
603
            transport.shutdown(shutdownReason);
1✔
604
          } else if (pendingTransport == transport) {
1✔
605
            activeTransport = transport;
1✔
606
            pendingTransport = null;
1✔
607
            connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
1✔
608
            gotoNonErrorState(READY);
1✔
609
            subchannelMetrics.recordConnectionAttemptSucceeded(/* target= */ target,
1✔
610
                /* backendService= */ getBackendServiceOrDefault(
1✔
611
                    addressIndex.getCurrentEagAttributes()),
1✔
612
                /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
613
                    EquivalentAddressGroup.ATTR_LOCALITY_NAME),
614
                /* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
1✔
615
                    .get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
1✔
616
          }
617
        }
1✔
618
      });
619
    }
1✔
620

621
    @Override
622
    public void transportInUse(boolean inUse) {
623
      handleTransportInUseState(transport, inUse);
1✔
624
    }
1✔
625

626
    @Override
627
    public void transportShutdown(final Status s, final DisconnectError disconnectError) {
628
      channelLogger.log(
1✔
629
          ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
1✔
630
      shutdownInitiated = true;
1✔
631
      syncContext.execute(new Runnable() {
1✔
632
        @Override
633
        public void run() {
634
          if (state.getState() == SHUTDOWN) {
1✔
635
            return;
1✔
636
          }
637
          if (activeTransport == transport) {
1✔
638
            activeTransport = null;
1✔
639
            addressIndex.reset();
1✔
640
            gotoNonErrorState(IDLE);
1✔
641
            subchannelMetrics.recordDisconnection(/* target= */ target,
1✔
642
                /* backendService= */ getBackendServiceOrDefault(
1✔
643
                    addressIndex.getCurrentEagAttributes()),
1✔
644
                /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
645
                    EquivalentAddressGroup.ATTR_LOCALITY_NAME),
646
                /* disconnectError= */ disconnectError.toErrorString(),
1✔
647
                /* securityLevel= */ extractSecurityLevel(addressIndex.getCurrentEagAttributes()
1✔
648
                    .get(GrpcAttributes.ATTR_SECURITY_LEVEL)));
1✔
649
          } else if (pendingTransport == transport) {
1✔
650
            subchannelMetrics.recordConnectionAttemptFailed(/* target= */ target,
1✔
651
                /* backendService= */ getBackendServiceOrDefault(
1✔
652
                    addressIndex.getCurrentEagAttributes()),
1✔
653
                /* locality= */ getAttributeOrDefault(addressIndex.getCurrentEagAttributes(),
1✔
654
                    EquivalentAddressGroup.ATTR_LOCALITY_NAME));
655
            Preconditions.checkState(state.getState() == CONNECTING,
1✔
656
                "Expected state is CONNECTING, actual state is %s", state.getState());
1✔
657
            addressIndex.increment();
1✔
658
            // Continue to reconnect if there are still addresses to try.
659
            if (!addressIndex.isValid()) {
1✔
660
              pendingTransport = null;
1✔
661
              addressIndex.reset();
1✔
662
              // Initiate backoff
663
              // Transition to TRANSIENT_FAILURE
664
              scheduleBackoff(s);
1✔
665
            } else {
666
              startNewTransport();
1✔
667
            }
668
          }
669
        }
1✔
670
      });
671
    }
1✔
672

673
    @Override
674
    public void transportTerminated() {
675
      Preconditions.checkState(
1✔
676
          shutdownInitiated, "transportShutdown() must be called before transportTerminated().");
677

678
      channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId());
1✔
679
      channelz.removeClientSocket(transport);
1✔
680
      handleTransportInUseState(transport, false);
1✔
681
      for (ClientTransportFilter filter : transportFilters) {
1✔
682
        filter.transportTerminated(transport.getAttributes());
1✔
683
      }
1✔
684
      syncContext.execute(new Runnable() {
1✔
685
        @Override
686
        public void run() {
687
          transports.remove(transport);
1✔
688
          if (state.getState() == SHUTDOWN && transports.isEmpty()) {
1✔
689
            handleTermination();
1✔
690
          }
691
        }
1✔
692
      });
693
    }
1✔
694

695
    private String extractSecurityLevel(SecurityLevel securityLevel) {
696
      if (securityLevel == null) {
1✔
697
        return "none";
1✔
698
      }
699
      switch (securityLevel) {
1✔
700
        case NONE:
701
          return "none";
×
702
        case INTEGRITY:
703
          return "integrity_only";
×
704
        case PRIVACY_AND_INTEGRITY:
705
          return "privacy_and_integrity";
1✔
706
        default:
707
          throw new IllegalArgumentException("Unknown SecurityLevel: " + securityLevel);
×
708
      }
709
    }
710

711
    private String getAttributeOrDefault(Attributes attributes, Attributes.Key<String> key) {
712
      String value = attributes.get(key);
1✔
713
      return value == null ? "" : value;
1✔
714
    }
715

716
    private String getBackendServiceOrDefault(Attributes attributes) {
717
      String value = attributes.get(InternalEquivalentAddressGroup.ATTR_BACKEND_SERVICE);
1✔
718
      if (value == null) {
1✔
719
        value = attributes.get(NameResolver.ATTR_BACKEND_SERVICE);
1✔
720
      }
721
      return value == null ? "" : value;
1✔
722
    }
723
  }
724

725
  // All methods are called in syncContext
726
  abstract static class Callback {
1✔
727
    /**
728
     * Called when the subchannel is terminated, which means it's shut down and all transports
729
     * have been terminated.
730
     */
731
    @ForOverride
732
    void onTerminated(InternalSubchannel is) { }
×
733

734
    /**
735
     * Called when the subchannel's connectivity state has changed.
736
     */
737
    @ForOverride
738
    void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { }
×
739

740
    /**
741
     * Called when the subchannel's in-use state has changed to true, which means at least one
742
     * transport is in use.
743
     */
744
    @ForOverride
745
    void onInUse(InternalSubchannel is) { }
×
746

747
    /**
748
     * Called when the subchannel's in-use state has changed to false, which means no transport is
749
     * in use.
750
     */
751
    @ForOverride
752
    void onNotInUse(InternalSubchannel is) { }
×
753
  }
754

755
  @VisibleForTesting
756
  static final class CallTracingTransport extends ForwardingConnectionClientTransport {
757
    private final ConnectionClientTransport delegate;
758
    private final CallTracer callTracer;
759

760
    private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) {
1✔
761
      this.delegate = delegate;
1✔
762
      this.callTracer = callTracer;
1✔
763
    }
1✔
764

765
    @Override
766
    protected ConnectionClientTransport delegate() {
767
      return delegate;
1✔
768
    }
769

770
    @Override
771
    public ClientStream newStream(
772
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
773
        ClientStreamTracer[] tracers) {
774
      final ClientStream streamDelegate = super.newStream(method, headers, callOptions, tracers);
1✔
775
      return new ForwardingClientStream() {
1✔
776
        @Override
777
        protected ClientStream delegate() {
778
          return streamDelegate;
1✔
779
        }
780

781
        @Override
782
        public void start(final ClientStreamListener listener) {
783
          callTracer.reportCallStarted();
1✔
784
          super.start(new ForwardingClientStreamListener() {
1✔
785
            @Override
786
            protected ClientStreamListener delegate() {
787
              return listener;
1✔
788
            }
789

790
            @Override
791
            public void closed(
792
                Status status, RpcProgress rpcProgress, Metadata trailers) {
793
              callTracer.reportCallEnded(status.isOk());
1✔
794
              super.closed(status, rpcProgress, trailers);
1✔
795
            }
1✔
796
          });
797
        }
1✔
798
      };
799
    }
800
  }
801

802
  /** Index as in 'i', the pointer to an entry. Not a "search index." */
803
  @VisibleForTesting
804
  static final class Index {
805
    private List<EquivalentAddressGroup> addressGroups;
806
    private int groupIndex;
807
    private int addressIndex;
808

809
    public Index(List<EquivalentAddressGroup> groups) {
1✔
810
      this.addressGroups = groups;
1✔
811
    }
1✔
812

813
    public boolean isValid() {
814
      // addressIndex will never be invalid
815
      return groupIndex < addressGroups.size();
1✔
816
    }
817

818
    public boolean isAtBeginning() {
819
      return groupIndex == 0 && addressIndex == 0;
1✔
820
    }
821

822
    public void increment() {
823
      EquivalentAddressGroup group = addressGroups.get(groupIndex);
1✔
824
      addressIndex++;
1✔
825
      if (addressIndex >= group.getAddresses().size()) {
1✔
826
        groupIndex++;
1✔
827
        addressIndex = 0;
1✔
828
      }
829
    }
1✔
830

831
    public void reset() {
832
      groupIndex = 0;
1✔
833
      addressIndex = 0;
1✔
834
    }
1✔
835

836
    public SocketAddress getCurrentAddress() {
837
      return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
1✔
838
    }
839

840
    public Attributes getCurrentEagAttributes() {
841
      return addressGroups.get(groupIndex).getAttributes();
1✔
842
    }
843

844
    public List<EquivalentAddressGroup> getGroups() {
845
      return addressGroups;
1✔
846
    }
847

848
    /** Update to new groups, resetting the current index. */
849
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
850
      addressGroups = newGroups;
1✔
851
      reset();
1✔
852
    }
1✔
853

854
    /** Returns false if the needle was not found and the current index was left unchanged. */
855
    public boolean seekTo(SocketAddress needle) {
856
      for (int i = 0; i < addressGroups.size(); i++) {
1✔
857
        EquivalentAddressGroup group = addressGroups.get(i);
1✔
858
        int j = group.getAddresses().indexOf(needle);
1✔
859
        if (j == -1) {
1✔
860
          continue;
1✔
861
        }
862
        this.groupIndex = i;
1✔
863
        this.addressIndex = j;
1✔
864
        return true;
1✔
865
      }
866
      return false;
1✔
867
    }
868
  }
869

870
  private String printShortStatus(Status status) {
871
    StringBuilder buffer = new StringBuilder();
1✔
872
    buffer.append(status.getCode());
1✔
873
    if (status.getDescription() != null) {
1✔
874
      buffer.append("(").append(status.getDescription()).append(")");
1✔
875
    }
876
    if (status.getCause() != null) {
1✔
877
      buffer.append("[").append(status.getCause()).append("]");
1✔
878
    }
879
    return buffer.toString();
1✔
880
  }
881

882
  @VisibleForTesting
883
  static final class TransportLogger extends ChannelLogger {
1✔
884
    // Changed just after construction to break a cyclic dependency.
885
    InternalLogId logId;
886

887
    @Override
888
    public void log(ChannelLogLevel level, String message) {
889
      ChannelLoggerImpl.logOnly(logId, level, message);
1✔
890
    }
1✔
891

892
    @Override
893
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
894
      ChannelLoggerImpl.logOnly(logId, level, messageFormat, args);
1✔
895
    }
1✔
896
  }
897
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc